aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/_framework/face/testing
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-01-23 21:48:42 -0800
committerGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-01-23 21:48:42 -0800
commite2380338bfc9cec2f44877b3551d4e94490c1daa (patch)
tree52a464f9d3b64fe756098c5368f3ae426e24e466 /src/python/_framework/face/testing
parentcacae338b6cf32b8deedc26141990da78bd03034 (diff)
parent7a3c38bed9ea9e69ec748b9ed08cdf16cb6ac501 (diff)
Merge pull request #193 from nathanielmanistaatgoogle/python-introduction
Bring the rest of Python RPC Framework into GRPC.
Diffstat (limited to 'src/python/_framework/face/testing')
-rw-r--r--src/python/_framework/face/testing/__init__.py0
-rw-r--r--src/python/_framework/face/testing/base_util.py102
-rw-r--r--src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py223
-rw-r--r--src/python/_framework/face/testing/callback.py94
-rw-r--r--src/python/_framework/face/testing/control.py87
-rw-r--r--src/python/_framework/face/testing/coverage.py123
-rw-r--r--src/python/_framework/face/testing/digest.py446
-rw-r--r--src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py367
-rw-r--r--src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py377
-rw-r--r--src/python/_framework/face/testing/interfaces.py117
-rw-r--r--src/python/_framework/face/testing/serial.py70
-rw-r--r--src/python/_framework/face/testing/service.py337
-rw-r--r--src/python/_framework/face/testing/stock_service.py374
-rw-r--r--src/python/_framework/face/testing/test_case.py111
14 files changed, 2828 insertions, 0 deletions
diff --git a/src/python/_framework/face/testing/__init__.py b/src/python/_framework/face/testing/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/src/python/_framework/face/testing/__init__.py
diff --git a/src/python/_framework/face/testing/base_util.py b/src/python/_framework/face/testing/base_util.py
new file mode 100644
index 0000000000..d9ccb3af8f
--- /dev/null
+++ b/src/python/_framework/face/testing/base_util.py
@@ -0,0 +1,102 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for creating Base-layer objects for use in Face-layer tests."""
+
+import abc
+
+# interfaces is referenced from specification in this module.
+from _framework.base import util as _base_util
+from _framework.base.packets import implementations
+from _framework.base.packets import in_memory
+from _framework.base.packets import interfaces # pylint: disable=unused-import
+from _framework.foundation import logging_pool
+
+_POOL_SIZE_LIMIT = 20
+
+_MAXIMUM_TIMEOUT = 90
+
+
+class LinkedPair(object):
+ """A Front and Back that are linked to one another.
+
+ Attributes:
+ front: An interfaces.Front.
+ back: An interfaces.Back.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def shut_down(self):
+ """Shuts down this object and releases its resources."""
+ raise NotImplementedError()
+
+
+class _LinkedPair(LinkedPair):
+
+ def __init__(self, front, back, pools):
+ self.front = front
+ self.back = back
+ self._pools = pools
+
+ def shut_down(self):
+ _base_util.wait_for_idle(self.front)
+ _base_util.wait_for_idle(self.back)
+
+ for pool in self._pools:
+ pool.shutdown(wait=True)
+
+
+def linked_pair(servicer, default_timeout):
+ """Creates a Server and Stub linked together for use."""
+ link_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ pools = (
+ link_pool,
+ front_work_pool, front_transmission_pool, front_utility_pool,
+ back_work_pool, back_transmission_pool, back_utility_pool)
+
+ link = in_memory.Link(link_pool)
+ front = implementations.front(
+ front_work_pool, front_transmission_pool, front_utility_pool)
+ back = implementations.back(
+ servicer, back_work_pool, back_transmission_pool, back_utility_pool,
+ default_timeout, _MAXIMUM_TIMEOUT)
+ front.join_rear_link(link)
+ link.join_fore_link(front)
+ back.join_fore_link(link)
+ link.join_rear_link(back)
+
+ return _LinkedPair(front, back, pools)
diff --git a/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py
new file mode 100644
index 0000000000..0b1a2f0bd2
--- /dev/null
+++ b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -0,0 +1,223 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+# unittest is referenced from specification in this module.
+import abc
+import unittest # pylint: disable=unused-import
+
+from _framework.face import exceptions
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+
+_TIMEOUT = 3
+
+
+class BlockingInvocationInlineServiceTestCase(
+ test_case.FaceTestCase, coverage.BlockingCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, None)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ self.digest.inline_unary_unary_methods,
+ self.digest.inline_unary_stream_methods,
+ self.digest.inline_stream_unary_methods,
+ self.digest.inline_stream_stream_methods,
+ {}, {}, {}, {}, None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response = self.stub.blocking_value_in_value_out(
+ name, request, _TIMEOUT)
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response = self.stub.blocking_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response = self.stub.blocking_value_in_value_out(
+ name, first_request, _TIMEOUT)
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response = self.stub.blocking_value_in_value_out(
+ name, second_request, _TIMEOUT)
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/_framework/face/testing/callback.py b/src/python/_framework/face/testing/callback.py
new file mode 100644
index 0000000000..7a20869abe
--- /dev/null
+++ b/src/python/_framework/face/testing/callback.py
@@ -0,0 +1,94 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A utility useful in tests of asynchronous, event-driven interfaces."""
+
+import threading
+
+from _framework.foundation import stream
+
+
+class Callback(stream.Consumer):
+ """A utility object useful in tests of asynchronous code."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._unary_response = None
+ self._streamed_responses = []
+ self._completed = False
+ self._abortion = None
+
+ def abort(self, abortion):
+ with self._condition:
+ self._abortion = abortion
+ self._condition.notify_all()
+
+ def complete(self, unary_response):
+ with self._condition:
+ self._unary_response = unary_response
+ self._completed = True
+ self._condition.notify_all()
+
+ def consume(self, streamed_response):
+ with self._condition:
+ self._streamed_responses.append(streamed_response)
+
+ def terminate(self):
+ with self._condition:
+ self._completed = True
+ self._condition.notify_all()
+
+ def consume_and_terminate(self, streamed_response):
+ with self._condition:
+ self._streamed_responses.append(streamed_response)
+ self._completed = True
+ self._condition.notify_all()
+
+ def block_until_terminated(self):
+ with self._condition:
+ while self._abortion is None and not self._completed:
+ self._condition.wait()
+
+ def response(self):
+ with self._condition:
+ if self._abortion is None:
+ return self._unary_response
+ else:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+ def responses(self):
+ with self._condition:
+ if self._abortion is None:
+ return list(self._streamed_responses)
+ else:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+ def abortion(self):
+ with self._condition:
+ return self._abortion
diff --git a/src/python/_framework/face/testing/control.py b/src/python/_framework/face/testing/control.py
new file mode 100644
index 0000000000..3960c4e649
--- /dev/null
+++ b/src/python/_framework/face/testing/control.py
@@ -0,0 +1,87 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for instructing systems under test to block or fail."""
+
+import abc
+import contextlib
+import threading
+
+
+class Control(object):
+ """An object that accepts program control from a system under test.
+
+ Systems under test passed a Control should call its control() method
+ frequently during execution. The control() method may block, raise an
+ exception, or do nothing, all according to the enclosing test's desire for
+ the system under test to simulate hanging, failing, or functioning.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def control(self):
+ """Potentially does anything."""
+ raise NotImplementedError()
+
+
+class PauseFailControl(Control):
+ """A Control that can be used to pause or fail code under control."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._paused = False
+ self._fail = False
+
+ def control(self):
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+
+ while self._paused:
+ self._condition.wait()
+
+ @contextlib.contextmanager
+ def pause(self):
+ """Pauses code under control while controlling code is in context."""
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ @contextlib.contextmanager
+ def fail(self):
+ """Fails code under control while controlling code is in context."""
+ with self._condition:
+ self._fail = True
+ yield
+ with self._condition:
+ self._fail = False
diff --git a/src/python/_framework/face/testing/coverage.py b/src/python/_framework/face/testing/coverage.py
new file mode 100644
index 0000000000..f3aca113fe
--- /dev/null
+++ b/src/python/_framework/face/testing/coverage.py
@@ -0,0 +1,123 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Governs coverage for the tests of the Face layer of RPC Framework."""
+
+import abc
+
+# These classes are only valid when inherited by unittest.TestCases.
+# pylint: disable=invalid-name
+
+
+class BlockingCoverage(object):
+ """Specification of test coverage for blocking behaviors."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSequentialInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+
+class FullCoverage(BlockingCoverage):
+ """Specification of test coverage for non-blocking behaviors."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def testParallelInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledStreamRequestStreamResponse(self):
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/digest.py b/src/python/_framework/face/testing/digest.py
new file mode 100644
index 0000000000..8d1291c975
--- /dev/null
+++ b/src/python/_framework/face/testing/digest.py
@@ -0,0 +1,446 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for making a service.TestService more amenable to use in tests."""
+
+import collections
+import threading
+
+# testing_control, interfaces, and testing_service are referenced from
+# specification in this module.
+from _framework.face import exceptions
+from _framework.face import interfaces as face_interfaces
+from _framework.face.testing import control as testing_control # pylint: disable=unused-import
+from _framework.face.testing import interfaces # pylint: disable=unused-import
+from _framework.face.testing import service as testing_service # pylint: disable=unused-import
+from _framework.foundation import stream
+from _framework.foundation import stream_util
+
+_IDENTITY = lambda x: x
+
+
+class TestServiceDigest(
+ collections.namedtuple(
+ 'TestServiceDigest',
+ ['name',
+ 'methods',
+ 'inline_unary_unary_methods',
+ 'inline_unary_stream_methods',
+ 'inline_stream_unary_methods',
+ 'inline_stream_stream_methods',
+ 'event_unary_unary_methods',
+ 'event_unary_stream_methods',
+ 'event_stream_unary_methods',
+ 'event_stream_stream_methods',
+ 'multi_method',
+ 'unary_unary_messages_sequences',
+ 'unary_stream_messages_sequences',
+ 'stream_unary_messages_sequences',
+ 'stream_stream_messages_sequences'])):
+ """A transformation of a service.TestService.
+
+ Attributes:
+ name: The RPC service name to be used in the test.
+ methods: A sequence of interfaces.Method objects describing the RPC
+ methods that will be called during the test.
+ inline_unary_unary_methods: A dict from method name to
+ face_interfaces.InlineValueInValueOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_unary_stream_methods: A dict from method name to
+ face_interfaces.InlineValueInStreamOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_stream_unary_methods: A dict from method name to
+ face_interfaces.InlineStreamInValueOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_stream_stream_methods: A dict from method name to
+ face_interfaces.InlineStreamInStreamOutMethod object to be used in tests
+ of in-line calls to behaviors under test.
+ event_unary_unary_methods: A dict from method name to
+ face_interfaces.EventValueInValueOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_unary_stream_methods: A dict from method name to
+ face_interfaces.EventValueInStreamOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_stream_unary_methods: A dict from method name to
+ face_interfaces.EventStreamInValueOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_stream_stream_methods: A dict from method name to
+ face_interfaces.EventStreamInStreamOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ multi_method: A face_interfaces.MultiMethod to be used in tests of generic
+ calls to behaviors under test.
+ unary_unary_messages_sequences: A dict from method name to sequence of
+ service.UnaryUnaryTestMessages objects to be used to test the method
+ with the given name.
+ unary_stream_messages_sequences: A dict from method name to sequence of
+ service.UnaryStreamTestMessages objects to be used to test the method
+ with the given name.
+ stream_unary_messages_sequences: A dict from method name to sequence of
+ service.StreamUnaryTestMessages objects to be used to test the method
+ with the given name.
+ stream_stream_messages_sequences: A dict from method name to sequence of
+ service.StreamStreamTestMessages objects to be used to test the
+ method with the given name.
+ serialization: A serial.Serialization object describing serialization
+ behaviors for all the RPC methods.
+ """
+
+
+class _BufferingConsumer(stream.Consumer):
+ """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+
+ def __init__(self):
+ self.consumed = []
+ self.terminated = False
+
+ def consume(self, value):
+ self.consumed.append(value)
+
+ def terminate(self):
+ self.terminated = True
+
+ def consume_and_terminate(self, value):
+ self.consumed.append(value)
+ self.terminated = True
+
+
+class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
+
+ def __init__(self, unary_unary_test_method, control):
+ self._test_method = unary_unary_test_method
+ self._control = control
+
+ def service(self, request, context):
+ response_list = []
+ self._test_method.service(
+ request, response_list.append, context, self._control)
+ return response_list.pop(0)
+
+
+class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod):
+
+ def __init__(self, unary_unary_test_method, control, pool):
+ self._test_method = unary_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, request, response_callback, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_callback, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_callback, context,
+ self._control)
+
+
+class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
+
+ def __init__(self, unary_stream_test_method, control):
+ self._test_method = unary_stream_test_method
+ self._control = control
+
+ def service(self, request, context):
+ response_consumer = _BufferingConsumer()
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ for response in response_consumer.consumed:
+ yield response
+
+
+class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod):
+
+ def __init__(self, unary_stream_test_method, control, pool):
+ self._test_method = unary_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, request, response_consumer, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_consumer, context,
+ self._control)
+
+
+class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
+
+ def __init__(self, stream_unary_test_method, control):
+ self._test_method = stream_unary_test_method
+ self._control = control
+
+ def service(self, request_iterator, context):
+ response_list = []
+ request_consumer = self._test_method.service(
+ response_list.append, context, self._control)
+ for request in request_iterator:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return response_list.pop(0)
+
+
+class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod):
+
+ def __init__(self, stream_unary_test_method, control, pool):
+ self._test_method = stream_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, response_callback, context):
+ request_consumer = self._test_method.service(
+ response_callback, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
+
+ def __init__(self, stream_stream_test_method, control):
+ self._test_method = stream_stream_test_method
+ self._control = control
+
+ def service(self, request_iterator, context):
+ response_consumer = _BufferingConsumer()
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+
+ for request in request_iterator:
+ request_consumer.consume(request)
+ while response_consumer.consumed:
+ yield response_consumer.consumed.pop(0)
+ response_consumer.terminate()
+
+
+class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod):
+
+ def __init__(self, stream_stream_test_method, control, pool):
+ self._test_method = stream_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, response_consumer, context):
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _UnaryConsumer(stream.Consumer):
+ """A Consumer that only allows consumption of exactly one value."""
+
+ def __init__(self, action):
+ self._lock = threading.Lock()
+ self._action = action
+ self._consumed = False
+ self._terminated = False
+
+ def consume(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+
+ self._action(value)
+
+ def terminate(self):
+ with self._lock:
+ if not self._consumed:
+ raise ValueError('Unary consumer hasn\'t yet consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._terminated = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+ self._terminated = True
+
+ self._action(value)
+
+
+class _UnaryUnaryAdaptation(object):
+
+ def __init__(self, unary_unary_test_method):
+ self._method = unary_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(
+ request, response_consumer.consume_and_terminate, context, control)
+ return _UnaryConsumer(action)
+
+
+class _UnaryStreamAdaptation(object):
+
+ def __init__(self, unary_stream_test_method):
+ self._method = unary_stream_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(request, response_consumer, context, control)
+ return _UnaryConsumer(action)
+
+
+class _StreamUnaryAdaptation(object):
+
+ def __init__(self, stream_unary_test_method):
+ self._method = stream_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ return self._method.service(
+ response_consumer.consume_and_terminate, context, control)
+
+
+class _MultiMethod(face_interfaces.MultiMethod):
+
+ def __init__(self, methods, control, pool):
+ self._methods = methods
+ self._control = control
+ self._pool = pool
+
+ def service(self, name, response_consumer, context):
+ method = self._methods.get(name, None)
+ if method is None:
+ raise exceptions.NoSuchMethodError(name)
+ elif self._pool is None:
+ return method(response_consumer, context, self._control)
+ else:
+ request_consumer = method(response_consumer, context, self._control)
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _Assembly(
+ collections.namedtuple(
+ '_Assembly',
+ ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+ """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(
+ scenarios, names, inline_method_constructor, event_method_constructor,
+ adapter, control, pool):
+ """Creates an _Assembly from the given scenarios."""
+ methods = []
+ inlines = {}
+ events = {}
+ adaptations = {}
+ messages = {}
+ for name, scenario in scenarios.iteritems():
+ if name in names:
+ raise ValueError('Repeated name "%s"!' % name)
+
+ test_method = scenario[0]
+ inline_method = inline_method_constructor(test_method, control)
+ event_method = event_method_constructor(test_method, control, pool)
+ adaptation = adapter(test_method)
+
+ methods.append(test_method)
+ inlines[name] = inline_method
+ events[name] = event_method
+ adaptations[name] = adaptation
+ messages[name] = scenario[1]
+
+ return _Assembly(methods, inlines, events, adaptations, messages)
+
+
+def digest(service, control, pool):
+ """Creates a TestServiceDigest from a TestService.
+
+ Args:
+ service: A testing_service.TestService.
+ control: A testing_control.Control.
+ pool: If RPC methods should be serviced in a separate thread, a thread pool.
+ None if RPC methods should be serviced in the thread belonging to the
+ run-time that calls for their service.
+
+ Returns:
+ A TestServiceDigest synthesized from the given service.TestService.
+ """
+ names = set()
+
+ unary_unary = _assemble(
+ service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod,
+ _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
+ names.update(set(unary_unary.inlines))
+
+ unary_stream = _assemble(
+ service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod,
+ _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
+ names.update(set(unary_stream.inlines))
+
+ stream_unary = _assemble(
+ service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod,
+ _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
+ names.update(set(stream_unary.inlines))
+
+ stream_stream = _assemble(
+ service.stream_stream_scenarios(), names, _InlineStreamStreamMethod,
+ _EventStreamStreamMethod, _IDENTITY, control, pool)
+ names.update(set(stream_stream.inlines))
+
+ methods = list(unary_unary.methods)
+ methods.extend(unary_stream.methods)
+ methods.extend(stream_unary.methods)
+ methods.extend(stream_stream.methods)
+ adaptations = dict(unary_unary.adaptations)
+ adaptations.update(unary_stream.adaptations)
+ adaptations.update(stream_unary.adaptations)
+ adaptations.update(stream_stream.adaptations)
+
+ return TestServiceDigest(
+ service.name(),
+ methods,
+ unary_unary.inlines,
+ unary_stream.inlines,
+ stream_unary.inlines,
+ stream_stream.inlines,
+ unary_unary.events,
+ unary_stream.events,
+ stream_unary.events,
+ stream_stream.events,
+ _MultiMethod(adaptations, control, pool),
+ unary_unary.messages,
+ unary_stream.messages,
+ stream_unary.messages,
+ stream_stream.messages)
diff --git a/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
new file mode 100644
index 0000000000..dba73a9368
--- /dev/null
+++ b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
@@ -0,0 +1,367 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+from _framework.face import interfaces
+from _framework.face.testing import callback as testing_callback
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+
+_TIMEOUT = 3
+
+
+class EventInvocationSynchronousEventServiceTestCase(
+ test_case.FaceTestCase, coverage.FullCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, None)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ {}, {}, {}, {},
+ self.digest.event_unary_unary_methods,
+ self.digest.event_unary_stream_methods,
+ self.digest.event_stream_unary_methods,
+ self.digest.event_stream_stream_methods,
+ None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+ response = callback.response()
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+ responses = callback.responses()
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+ response = callback.response()
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+ responses = callback.responses()
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ # pylint: disable=cell-var-from-loop
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+ first_callback = testing_callback.Callback()
+ second_callback = testing_callback.Callback()
+
+ def make_second_invocation(first_response):
+ first_callback.complete(first_response)
+ self.stub.event_value_in_value_out(
+ name, second_request, second_callback.complete,
+ second_callback.abort, _TIMEOUT)
+
+ self.stub.event_value_in_value_out(
+ name, first_request, make_second_invocation, first_callback.abort,
+ _TIMEOUT)
+ second_callback.block_until_terminated()
+
+ first_response = first_callback.response()
+ second_response = second_callback.response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ callback = testing_callback.Callback()
+
+ self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ unused_call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testParallelInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ first_callback = testing_callback.Callback()
+ second_request = test_messages.request()
+ second_callback = testing_callback.Callback()
+
+ self.stub.event_value_in_value_out(
+ name, first_request, first_callback.complete, first_callback.abort,
+ _TIMEOUT)
+ self.stub.event_value_in_value_out(
+ name, second_request, second_callback.complete,
+ second_callback.abort, _TIMEOUT)
+ first_callback.block_until_terminated()
+ second_callback.block_until_terminated()
+
+ first_response = first_callback.response()
+ second_response = second_callback.response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ call = self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ call = self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ callback = testing_callback.Callback()
+
+ call, unused_request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
diff --git a/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
new file mode 100644
index 0000000000..cf8b2eeb95
--- /dev/null
+++ b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -0,0 +1,377 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+import abc
+import contextlib
+import threading
+import unittest
+
+from _framework.face import exceptions
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+from _framework.foundation import future
+from _framework.foundation import logging_pool
+
+_TIMEOUT = 3
+_MAXIMUM_POOL_SIZE = 100
+
+
+class _PauseableIterator(object):
+
+ def __init__(self, upstream):
+ self._upstream = upstream
+ self._condition = threading.Condition()
+ self._paused = False
+
+ @contextlib.contextmanager
+ def pause(self):
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while self._paused:
+ self._condition.wait()
+ return next(self._upstream)
+
+
+class FutureInvocationAsynchronousEventServiceTestCase(
+ test_case.FaceTestCase, coverage.FullCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ {}, {}, {}, {},
+ self.digest.event_unary_unary_methods,
+ self.digest.event_unary_stream_methods,
+ self.digest.event_stream_unary_methods,
+ self.digest.event_stream_stream_methods,
+ None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+ self.digest_pool.shutdown(wait=True)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ response = response_future.outcome().return_value
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, request_iterator, _TIMEOUT)
+ response = response_future.outcome().return_value
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, request_iterator, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self.stub.future_value_in_value_out(
+ name, first_request, _TIMEOUT)
+ first_response = first_response_future.outcome().return_value
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response_future = self.stub.future_value_in_value_out(
+ name, second_request, _TIMEOUT)
+ second_response = second_response_future.outcome().return_value
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ outcome = response_future.outcome()
+
+ self.assertIsInstance(
+ outcome.exception, exceptions.ExpirationError)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ outcome = response_future.outcome()
+
+ self.assertIsInstance(
+ outcome.exception, exceptions.ExpirationError)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ outcome = response_future.outcome()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_callback before the
+ # expiration of the RPC.
+ self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ outcome = response_future.outcome()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_callback before the
+ # expiration of the RPC.
+ self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testParallelInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self.stub.future_value_in_value_out(
+ name, first_request, _TIMEOUT)
+ second_response_future = self.stub.future_value_in_value_out(
+ name, second_request, _TIMEOUT)
+ first_response = first_response_future.outcome().return_value
+ second_response = second_response_future.outcome().return_value
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ cancelled = response_future.cancel()
+
+ self.assertFalse(cancelled)
+ self.assertEqual(future.ABORTED, response_future.outcome().category)
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(exceptions.CancellationError):
+ next(response_iterator)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ cancelled = response_future.cancel()
+
+ self.assertFalse(cancelled)
+ self.assertEqual(future.ABORTED, response_future.outcome().category)
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(exceptions.CancellationError):
+ next(response_iterator)
diff --git a/src/python/_framework/face/testing/interfaces.py b/src/python/_framework/face/testing/interfaces.py
new file mode 100644
index 0000000000..253f6f118d
--- /dev/null
+++ b/src/python/_framework/face/testing/interfaces.py
@@ -0,0 +1,117 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# cardinality is referenced from specification in this module.
+from _framework.common import cardinality # pylint: disable=unused-import
+
+
+class Method(object):
+ """An RPC method to be used in tests of RPC implementations."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """Identify the name of the method.
+
+ Returns:
+ The name of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cardinality(self):
+ """Identify the cardinality of the method.
+
+ Returns:
+ A cardinality.Cardinality value describing the streaming semantics of the
+ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def request_class(self):
+ """Identify the class used for the method's request objects.
+
+ Returns:
+ The class object of the class to which the method's request objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response_class(self):
+ """Identify the class used for the method's response objects.
+
+ Returns:
+ The class object of the class to which the method's response objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize the given request object.
+
+ Args:
+ request: A request object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """Synthesize a request object from a given bytestring.
+
+ Args:
+ serialized_request: A bytestring deserializable into a request object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize the given response object.
+
+ Args:
+ response: A response object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """Synthesize a response object from a given bytestring.
+
+ Args:
+ serialized_response: A bytestring deserializable into a response object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/serial.py b/src/python/_framework/face/testing/serial.py
new file mode 100644
index 0000000000..47fc5822de
--- /dev/null
+++ b/src/python/_framework/face/testing/serial.py
@@ -0,0 +1,70 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility for serialization in the context of test RPC services."""
+
+import collections
+
+
+class Serialization(
+ collections.namedtuple(
+ '_Serialization',
+ ['request_serializers',
+ 'request_deserializers',
+ 'response_serializers',
+ 'response_deserializers'])):
+ """An aggregation of serialization behaviors for an RPC service.
+
+ Attributes:
+ request_serializers: A dict from method name to request object serializer
+ behavior.
+ request_deserializers: A dict from method name to request object
+ deserializer behavior.
+ response_serializers: A dict from method name to response object serializer
+ behavior.
+ response_deserializers: A dict from method name to response object
+ deserializer behavior.
+ """
+
+
+def serialization(methods):
+ """Creates a Serialization from a sequences of interfaces.Method objects."""
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for method in methods:
+ name = method.name()
+ request_serializers[name] = method.serialize_request
+ request_deserializers[name] = method.deserialize_request
+ response_serializers[name] = method.serialize_response
+ response_deserializers[name] = method.deserialize_response
+ return Serialization(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
diff --git a/src/python/_framework/face/testing/service.py b/src/python/_framework/face/testing/service.py
new file mode 100644
index 0000000000..771346ec2e
--- /dev/null
+++ b/src/python/_framework/face/testing/service.py
@@ -0,0 +1,337 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# interfaces is referenced from specification in this module.
+from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
+from _framework.face.testing import interfaces
+
+
+class UnaryUnaryTestMethod(interfaces.Method):
+ """Like face_interfaces.EventValueInValueOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_callback, context, control):
+ """Services an RPC that accepts one message and produces one message.
+
+ Args:
+ request: The single request message for the RPC.
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An face_interfaces.RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnaryTestMessages(object):
+ """A type for unary-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, response, test_case):
+ """Verifies that the computed response matches the given request.
+
+ Args:
+ request: A request message.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMethod(interfaces.Method):
+ """Like face_interfaces.EventValueInStreamOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context, control):
+ """Services an RPC that takes one message and produces a stream of messages.
+
+ Args:
+ request: The single request message for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMessages(object):
+ """A type for unary-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, responses, test_case):
+ """Verifies that the computed responses match the given request.
+
+ Args:
+ request: A request message.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and responses do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMethod(interfaces.Method):
+ """Like face_interfaces.EventStreamInValueOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_callback, context, control):
+ """Services an RPC that takes a stream of messages and produces one message.
+
+ Args:
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMessages(object):
+ """A type for stream-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, response, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMethod(interfaces.Method):
+ """Like face_interfaces.EventStreamInStreamOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_consumer, context, control):
+ """Services an RPC that accepts and produces streams of messages.
+
+ Args:
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMessages(object):
+ """A type for stream-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, responses, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and responses do not match, indicating
+ that there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class TestService(object):
+ """A specification of implemented RPC methods to use in tests."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """Identifies the RPC service name used during the test.
+
+ Returns:
+ The RPC service name to be used for the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_unary_scenarios(self):
+ """Affords unary-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair
+ is a UnaryUnaryTestMethod object and the second element is a sequence
+ of UnaryUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream_scenarios(self):
+ """Affords unary-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ UnaryStreamTestMethod object and the second element is a sequence of
+ UnaryStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary_scenarios(self):
+ """Affords stream-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ StreamUnaryTestMethod object and the second element is a sequence of
+ StreamUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream_scenarios(self):
+ """Affords stream-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ StreamStreamTestMethod object and the second element is a sequence of
+ StreamStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/stock_service.py b/src/python/_framework/face/testing/stock_service.py
new file mode 100644
index 0000000000..bd82877e83
--- /dev/null
+++ b/src/python/_framework/face/testing/stock_service.py
@@ -0,0 +1,374 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Examples of Python implementations of the stock.proto Stock service."""
+
+from _framework.common import cardinality
+from _framework.face.testing import service
+from _framework.foundation import abandonment
+from _framework.foundation import stream
+from _framework.foundation import stream_util
+from _junkdrawer import stock_pb2
+
+SYMBOL_FORMAT = 'test symbol:%03d'
+STREAM_LENGTH = 400
+
+# A test-appropriate security-pricing function. :-P
+_price = lambda symbol_name: float(hash(symbol_name) % 4096)
+
+
+def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
+ """A unary-request, unary-response test method."""
+ control.control()
+ if active():
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol)))
+ else:
+ raise abandonment.Abandoned()
+
+
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+ """A stream-request, stream-response test method."""
+ def stock_reply_for_stock_request(stock_request):
+ control.control()
+ if active():
+ return stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ else:
+ raise abandonment.Abandoned()
+ return stream_util.TransformingConsumer(
+ stock_reply_for_stock_request, stock_reply_consumer)
+
+
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+ """A unary-request, stream-response test method."""
+ base_price = _price(stock_request.symbol)
+ for index in range(stock_request.num_trades_to_watch):
+ control.control()
+ if active():
+ stock_reply_consumer.consume(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=base_price + index))
+ else:
+ raise abandonment.Abandoned()
+ stock_reply_consumer.terminate()
+
+
+def _get_highest_trade_price(stock_reply_callback, control, active):
+ """A stream-request, unary-response test method."""
+
+ class StockRequestConsumer(stream.Consumer):
+ """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+ def __init__(self):
+ self._symbol = None
+ self._price = None
+
+ def consume(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ self._symbol = stock_request.symbol
+ self._price = _price(stock_request.symbol)
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ self._symbol = stock_request.symbol
+ self._price = candidate_price
+
+ def terminate(self):
+ control.control()
+ if active():
+ if self._symbol is None:
+ raise ValueError()
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(symbol=self._symbol, price=self._price))
+ self._symbol = None
+ self._price = None
+
+ def consume_and_terminate(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=_price(stock_request.symbol)))
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=candidate_price))
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+
+ self._symbol = None
+ self._price = None
+
+ return StockRequestConsumer()
+
+
+class GetLastTradePrice(service.UnaryUnaryTestMethod):
+ """GetLastTradePrice for use in tests."""
+
+ def name(self):
+ return 'GetLastTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_callback, context, control):
+ _get_last_trade_price(
+ request, response_callback, control, context.is_active)
+
+
+class GetLastTradePriceMessages(service.UnaryUnaryTestMessages):
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(symbol=symbol)
+
+ def verify(self, request, response, test_case):
+ test_case.assertEqual(request.symbol, response.symbol)
+ test_case.assertEqual(_price(request.symbol), response.price)
+
+
+class GetLastTradePriceMultiple(service.StreamStreamTestMethod):
+ """GetLastTradePriceMultiple for use in tests."""
+
+ def name(self):
+ return 'GetLastTradePriceMultiple'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_consumer, context, control):
+ return _get_last_trade_price_multiple(
+ response_consumer, control, context.is_active)
+
+
+class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages):
+ """Pairs of message streams for use with GetLastTradePriceMultiple."""
+
+ def __init__(self):
+ self._index = 0
+
+ def requests(self):
+ base_index = self._index
+ self._index += 1
+ return [
+ stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index))
+ for index in range(STREAM_LENGTH)]
+
+ def verify(self, requests, responses, test_case):
+ test_case.assertEqual(len(requests), len(responses))
+ for stock_request, stock_reply in zip(requests, responses):
+ test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+ test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+
+
+class WatchFutureTrades(service.UnaryStreamTestMethod):
+ """WatchFutureTrades for use in tests."""
+
+ def name(self):
+ return 'WatchFutureTrades'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_consumer, context, control):
+ _watch_future_trades(request, response_consumer, control, context.is_active)
+
+
+class WatchFutureTradesMessages(service.UnaryStreamTestMessages):
+ """Pairs of a single request message and a sequence of response messages."""
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(
+ symbol=symbol, num_trades_to_watch=STREAM_LENGTH)
+
+ def verify(self, request, responses, test_case):
+ test_case.assertEqual(STREAM_LENGTH, len(responses))
+ base_price = _price(request.symbol)
+ for index, response in enumerate(responses):
+ test_case.assertEqual(base_price + index, response.price)
+
+
+class GetHighestTradePrice(service.StreamUnaryTestMethod):
+ """GetHighestTradePrice for use in tests."""
+
+ def name(self):
+ return 'GetHighestTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_callback, context, control):
+ return _get_highest_trade_price(
+ response_callback, control, context.is_active)
+
+
+class GetHighestTradePriceMessages(service.StreamUnaryTestMessages):
+
+ def requests(self):
+ return [
+ stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index)
+ for index in range(STREAM_LENGTH)]
+
+ def verify(self, requests, response, test_case):
+ price = None
+ symbol = None
+ for stock_request in requests:
+ current_symbol = stock_request.symbol
+ current_price = _price(current_symbol)
+ if price is None or price < current_price:
+ price = current_price
+ symbol = current_symbol
+ test_case.assertEqual(price, response.price)
+ test_case.assertEqual(symbol, response.symbol)
+
+
+class StockTestService(service.TestService):
+ """A corpus of test data with one method of each RPC cardinality."""
+
+ def name(self):
+ return 'Stock'
+
+ def unary_unary_scenarios(self):
+ return {
+ 'GetLastTradePrice': (
+ GetLastTradePrice(), [GetLastTradePriceMessages()]),
+ }
+
+ def unary_stream_scenarios(self):
+ return {
+ 'WatchFutureTrades': (
+ WatchFutureTrades(), [WatchFutureTradesMessages()]),
+ }
+
+ def stream_unary_scenarios(self):
+ return {
+ 'GetHighestTradePrice': (
+ GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+ }
+
+ def stream_stream_scenarios(self):
+ return {
+ 'GetLastTradePriceMultiple': (
+ GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
+ }
+
+
+STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/_framework/face/testing/test_case.py b/src/python/_framework/face/testing/test_case.py
new file mode 100644
index 0000000000..09b5a67f5a
--- /dev/null
+++ b/src/python/_framework/face/testing/test_case.py
@@ -0,0 +1,111 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tools for creating tests of implementations of the Face layer."""
+
+import abc
+
+# face_interfaces and interfaces are referenced in specification in this module.
+from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
+from _framework.face.testing import interfaces # pylint: disable=unused-import
+
+
+class FaceTestCase(object):
+ """Describes a test of the Face Layer of RPC Framework.
+
+ Concrete subclasses must also inherit from unittest.TestCase and from at least
+ one class that defines test methods.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_up_implementation(
+ self,
+ name,
+ methods,
+ inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods,
+ event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods,
+ multi_method):
+ """Instantiates the Face Layer implementation under test.
+
+ Args:
+ name: The service name to be used in the test.
+ methods: A sequence of interfaces.Method objects describing the RPC
+ methods that will be called during the test.
+ inline_value_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.InlineValueInValueOutMethod implementations of those
+ methods.
+ inline_value_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.InlineValueInStreamOutMethod implementations of those
+ methods.
+ inline_stream_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.InlineStreamInValueOutMethod implementations of those
+ methods.
+ inline_stream_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.InlineStreamInStreamOutMethod implementations of
+ those methods.
+ event_value_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.EventValueInValueOutMethod implementations of those
+ methods.
+ event_value_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.EventValueInStreamOutMethod implementations of those
+ methods.
+ event_stream_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.EventStreamInValueOutMethod implementations of those
+ methods.
+ event_stream_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.EventStreamInStreamOutMethod implementations of those
+ methods.
+ multi_method: An face_interfaces.MultiMethod, or None.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ face_interfaces.Server, the second element of which is a
+ face_interfaces.Stub, (both of which are backed by the given method
+ implementations), and the third element of which is an arbitrary memo
+ object to be kept and passed to tearDownImplementation at the conclusion
+ of the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def tear_down_implementation(self, memo):
+ """Destroys the Face layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of
+ set_up_implementation.
+ """
+ raise NotImplementedError()