aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/framework/interfaces
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2016-07-06 11:02:46 -0700
committerGravatar GitHub <noreply@github.com>2016-07-06 11:02:46 -0700
commitdcca468abe87f76bcb8fd85973781612742da9df (patch)
treed4caa183b69708de881b7e98b558c55b06dced92 /src/python/grpcio_tests/tests/unit/framework/interfaces
parent014b070a9491ba483dceb9f409c3426deccc17a7 (diff)
parent4763678016a253b5ed2e33579e52b124f1d8fa21 (diff)
Merge pull request #6791 from soltanmm/pain
Make running individual Python tests less painful
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/framework/interfaces')
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py30
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/__init__.py30
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/_control.py570
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/_sequence.py171
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/_state.py55
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_cases.py279
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_interfaces.py186
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py37
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py30
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py295
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py446
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py480
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py213
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_receiver.py95
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py316
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py396
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py67
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py229
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/links/__init__.py30
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_cases.py327
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_utilities.py167
21 files changed, 4449 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_control.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_control.py
new file mode 100644
index 0000000000..0eb38abf22
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_control.py
@@ -0,0 +1,570 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+from __future__ import division
+
+import abc
+import collections
+import enum
+import random # pylint: disable=unused-import
+import threading
+import time
+
+import six
+
+from grpc.framework.interfaces.base import base
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.interfaces.base import _sequence
+from tests.unit.framework.interfaces.base import _state
+from tests.unit.framework.interfaces.base import test_interfaces # pylint: disable=unused-import
+
+_GROUP = 'base test cases test group'
+_METHOD = 'base test cases test method'
+
+_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE // 20
+_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE // 600
+
+
+def _create_payload(randomness):
+ length = randomness.randint(
+ _MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE)
+ random_section_length = randomness.randint(
+ 0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length))
+ random_section = bytes(
+ bytearray(
+ randomness.getrandbits(8) for _ in range(random_section_length)))
+ sevens_section = b'\x07' * (length - random_section_length)
+ return b''.join(randomness.sample((random_section, sevens_section), 2))
+
+
+def _anything_in_flight(state):
+ return (
+ state.invocation_initial_metadata_in_flight is not None or
+ state.invocation_payloads_in_flight or
+ state.invocation_completion_in_flight is not None or
+ state.service_initial_metadata_in_flight is not None or
+ state.service_payloads_in_flight or
+ state.service_completion_in_flight is not None or
+ 0 < state.invocation_allowance_in_flight or
+ 0 < state.service_allowance_in_flight
+ )
+
+
+def _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.invocation_initial_metadata_received:
+ return 'Later invocation initial metadata received: %s' % (
+ initial_metadata,)
+ if state.invocation_payloads_received:
+ return 'Invocation initial metadata received after payloads: %s' % (
+ state.invocation_payloads_received)
+ if state.invocation_completion_received:
+ return 'Invocation initial metadata received after invocation completion!'
+ if not implementation.metadata_transmitted(
+ state.invocation_initial_metadata_in_flight, initial_metadata):
+ return 'Invocation initial metadata maltransmitted: %s, %s' % (
+ state.invocation_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.invocation_initial_metadata_in_flight = None
+ state.invocation_initial_metadata_received = True
+
+ if payload is not None:
+ if state.invocation_completion_received:
+ return 'Invocation payload received after invocation completion!'
+ elif not state.invocation_payloads_in_flight:
+ return 'Invocation payload "%s" received but not in flight!' % (payload,)
+ elif state.invocation_payloads_in_flight[0] != payload:
+ return 'Invocation payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.service_side_invocation_allowance < 1:
+ return 'Disallowed invocation payload!'
+ else:
+ state.invocation_payloads_in_flight.pop(0)
+ state.invocation_payloads_received += 1
+ state.service_side_invocation_allowance -= 1
+
+ if completion is not None:
+ if state.invocation_completion_received:
+ return 'Later invocation completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.invocation_completion_in_flight, completion):
+ return 'Invocation completion maltransmitted: %s, %s' % (
+ state.invocation_completion_in_flight, completion)
+ else:
+ state.invocation_completion_in_flight = None
+ state.invocation_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.service_allowance_in_flight -= allowance
+ state.service_side_service_allowance += allowance
+
+
+def _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, state, implementation):
+ if initial_metadata is not None:
+ if state.service_initial_metadata_received:
+ return 'Later service initial metadata received: %s' % (initial_metadata,)
+ if state.service_payloads_received:
+ return 'Service initial metadata received after service payloads: %s' % (
+ state.service_payloads_received)
+ if state.service_completion_received:
+ return 'Service initial metadata received after service completion!'
+ if not implementation.metadata_transmitted(
+ state.service_initial_metadata_in_flight, initial_metadata):
+ return 'Service initial metadata maltransmitted: %s, %s' % (
+ state.service_initial_metadata_in_flight, initial_metadata)
+ else:
+ state.service_initial_metadata_in_flight = None
+ state.service_initial_metadata_received = True
+
+ if payload is not None:
+ if state.service_completion_received:
+ return 'Service payload received after service completion!'
+ elif not state.service_payloads_in_flight:
+ return 'Service payload "%s" received but not in flight!' % (payload,)
+ elif state.service_payloads_in_flight[0] != payload:
+ return 'Service payload mismatch: %s, %s' % (
+ state.invocation_payloads_in_flight[0], payload)
+ elif state.invocation_side_service_allowance < 1:
+ return 'Disallowed service payload!'
+ else:
+ state.service_payloads_in_flight.pop(0)
+ state.service_payloads_received += 1
+ state.invocation_side_service_allowance -= 1
+
+ if completion is not None:
+ if state.service_completion_received:
+ return 'Later service completion received: %s' % (completion,)
+ elif not implementation.completion_transmitted(
+ state.service_completion_in_flight, completion):
+ return 'Service completion maltransmitted: %s, %s' % (
+ state.service_completion_in_flight, completion)
+ else:
+ state.service_completion_in_flight = None
+ state.service_completion_received = True
+
+ if allowance is not None:
+ if allowance <= 0:
+ return 'Illegal allowance value: %s' % (allowance,)
+ else:
+ state.invocation_allowance_in_flight -= allowance
+ state.invocation_side_service_allowance += allowance
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation',
+ ('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata',
+ 'payload', 'completion',))):
+ """A description of operation invocation.
+
+ Attributes:
+ group: The group identifier for the operation.
+ method: The method identifier for the operation.
+ subscription_kind: A base.Subscription.Kind value describing the kind of
+ subscription to use for the operation.
+ timeout: A duration in seconds to pass as the timeout value for the
+ operation.
+ initial_metadata: An object to pass as the initial metadata for the
+ operation or None.
+ payload: An object to pass as a payload value for the operation or None.
+ completion: An object to pass as a completion value for the operation or
+ None.
+ """
+
+
+class OnAdvance(
+ collections.namedtuple(
+ 'OnAdvance',
+ ('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))):
+ """Describes action to be taken in a test in response to an advance call.
+
+ Attributes:
+ kind: A Kind value describing the overall kind of response.
+ initial_metadata: An initial metadata value to pass to a call of the advance
+ method of the operator under test. Only valid if kind is Kind.ADVANCE and
+ may be None.
+ payload: A payload value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ completion: A base.Completion value to pass to a call of the advance method
+ of the operator under test. Only valid if kind is Kind.ADVANCE and may be
+ None.
+ allowance: An allowance value to pass to a call of the advance method of the
+ operator under test. Only valid if kind is Kind.ADVANCE and may be None.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'advance'
+ DEFECT = 'defect'
+ IDLE = 'idle'
+
+
+_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None)
+_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None)
+
+
+class Instruction(
+ collections.namedtuple(
+ 'Instruction',
+ ('kind', 'advance_args', 'advance_kwargs', 'conclude_success',
+ 'conclude_message', 'conclude_invocation_outcome_kind',
+ 'conclude_service_outcome_kind',))):
+ """"""
+
+ @enum.unique
+ class Kind(enum.Enum):
+ ADVANCE = 'ADVANCE'
+ CANCEL = 'CANCEL'
+ CONCLUDE = 'CONCLUDE'
+
+
+class Controller(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def failed(self, message):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def poll(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_on_termination(self, outcome):
+ """"""
+ raise NotImplementedError()
+
+
+class ControllerCreator(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def name(self):
+ """"""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def controller(self, implementation, randomness):
+ """"""
+ raise NotImplementedError()
+
+
+class _Remainder(
+ collections.namedtuple(
+ '_Remainder',
+ ('invocation_payloads', 'service_payloads', 'invocation_completion',
+ 'service_completion',))):
+ """Describes work remaining to be done in a portion of a test.
+
+ Attributes:
+ invocation_payloads: The number of payloads to be sent from the invocation
+ side of the operation to the service side of the operation.
+ service_payloads: The number of payloads to be sent from the service side of
+ the operation to the invocation side of the operation.
+ invocation_completion: Whether or not completion from the invocation side of
+ the operation should be indicated and has yet to be indicated.
+ service_completion: Whether or not completion from the service side of the
+ operation should be indicated and has yet to be indicated.
+ """
+
+
+class _SequenceController(Controller):
+
+ def __init__(self, sequence, implementation, randomness):
+ """Constructor.
+
+ Args:
+ sequence: A _sequence.Sequence describing the steps to be taken in the
+ test at a relatively high level.
+ implementation: A test_interfaces.Implementation encapsulating the
+ base interface implementation that is the system under test.
+ randomness: A random.Random instance for use in the test.
+ """
+ self._condition = threading.Condition()
+ self._sequence = sequence
+ self._implementation = implementation
+ self._randomness = randomness
+
+ self._until = None
+ self._remaining_elements = None
+ self._poll_next = None
+ self._message = None
+
+ self._state = _state.OperationState()
+ self._todo = None
+
+ # called with self._condition
+ def _failed(self, message):
+ self._message = message
+ self._condition.notify_all()
+
+ def _passed(self, invocation_outcome, service_outcome):
+ self._poll_next = Instruction(
+ Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome,
+ service_outcome)
+ self._condition.notify_all()
+
+ def failed(self, message):
+ with self._condition:
+ self._failed(message)
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) // 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) // 3:]
+
+ def invocation(self):
+ with self._condition:
+ self._until = time.time() + self._sequence.maximum_duration
+ self._remaining_elements = list(self._sequence.elements)
+ if self._sequence.invocation.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if self._sequence.invocation.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ else:
+ payload = None
+ if self._sequence.invocation.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ return Invocation(
+ _GROUP, _METHOD, base.Subscription.Kind.FULL,
+ self._sequence.invocation.timeout, initial_metadata, payload,
+ completion)
+
+ def poll(self):
+ with self._condition:
+ while True:
+ if self._message is not None:
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False, self._message, None,
+ None)
+ elif self._poll_next:
+ poll_next = self._poll_next
+ self._poll_next = None
+ return poll_next
+ elif self._until < time.time():
+ return Instruction(
+ Instruction.Kind.CONCLUDE, None, None, False,
+ 'overran allotted time!', None, None)
+ else:
+ self._condition.wait(timeout=self._until-time.time())
+
+ def on_service_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_service_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.service_initial_metadata()
+ self._state.service_initial_metadata_in_flight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.service_payloads_in_flight.append(payload)
+ self._state.service_side_service_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.service_completion()
+ self._state.service_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.invocation_completion_received and
+ 0 <= self._state.service_side_invocation_allowance):
+ allowance = 1
+ self._state.service_side_invocation_allowance += 1
+ self._state.invocation_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def on_invocation_advance(
+ self, initial_metadata, payload, completion, allowance):
+ with self._condition:
+ message = _verify_invocation_advance_and_update_state(
+ initial_metadata, payload, completion, allowance, self._state,
+ self._implementation)
+ if message is not None:
+ self._failed(message)
+ if self._todo is not None:
+ raise ValueError('TODO!!!')
+ elif _anything_in_flight(self._state):
+ return _IDLE_ON_ADVANCE
+ elif self._remaining_elements:
+ element = self._remaining_elements.pop(0)
+ if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION:
+ if element.transmission.initial_metadata:
+ initial_metadata = self._implementation.invocation_initial_metadata()
+ self._state.invocation_initial_metadata_in_fight = initial_metadata
+ else:
+ initial_metadata = None
+ if element.transmission.payload:
+ payload = _create_payload(self._randomness)
+ self._state.invocation_payloads_in_flight.append(payload)
+ self._state.invocation_side_invocation_allowance -= 1
+ else:
+ payload = None
+ if element.transmission.complete:
+ completion = self._implementation.invocation_completion()
+ self._state.invocation_completion_in_flight = completion
+ else:
+ completion = None
+ if (not self._state.service_completion_received and
+ 0 <= self._state.invocation_side_service_allowance):
+ allowance = 1
+ self._state.invocation_side_service_allowance += 1
+ self._state.service_allowance_in_flight += 1
+ else:
+ allowance = None
+ return OnAdvance(
+ OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion,
+ allowance)
+ else:
+ raise ValueError('TODO!!!')
+ else:
+ return _IDLE_ON_ADVANCE
+
+ def service_on_termination(self, outcome):
+ with self._condition:
+ self._state.service_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature service-side outcome %s!' % (outcome,))
+ elif outcome.kind is not self._sequence.outcome_kinds.service:
+ self._failed(
+ 'Incorrect service-side outcome kind: %s should have been %s' % (
+ outcome.kind, self._sequence.outcome_kinds.service))
+ elif self._state.invocation_side_outcome is not None:
+ self._passed(self._state.invocation_side_outcome.kind, outcome.kind)
+
+ def invocation_on_termination(self, outcome):
+ with self._condition:
+ self._state.invocation_side_outcome = outcome
+ if self._todo is not None or self._remaining_elements:
+ self._failed('Premature invocation-side outcome %s!' % (outcome,))
+ elif outcome.kind is not self._sequence.outcome_kinds.invocation:
+ self._failed(
+ 'Incorrect invocation-side outcome kind: %s should have been %s' % (
+ outcome.kind, self._sequence.outcome_kinds.invocation))
+ elif self._state.service_side_outcome is not None:
+ self._passed(outcome.kind, self._state.service_side_outcome.kind)
+
+
+class _SequenceControllerCreator(ControllerCreator):
+
+ def __init__(self, sequence):
+ self._sequence = sequence
+
+ def name(self):
+ return self._sequence.name
+
+ def controller(self, implementation, randomness):
+ return _SequenceController(self._sequence, implementation, randomness)
+
+
+CONTROLLER_CREATORS = tuple(
+ _SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_sequence.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_sequence.py
new file mode 100644
index 0000000000..571d0e1e63
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_sequence.py
@@ -0,0 +1,171 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+import collections
+import enum
+
+from grpc.framework.interfaces.base import base
+from tests.unit.framework.common import test_constants
+
+
+class Invocation(
+ collections.namedtuple(
+ 'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))):
+ """A recipe for operation invocation.
+
+ Attributes:
+ timeout: A duration in seconds to pass to the system under test as the
+ operation's timeout value.
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata when invoking the operation.
+ payload: A boolean indicating whether or not to pass a payload when
+ invoking the operation.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmissions from the invoking side of the operation when invoking the
+ operation.
+ """
+
+
+class Transmission(
+ collections.namedtuple(
+ 'Transmission', ('initial_metadata', 'payload', 'complete',))):
+ """A recipe for a single transmission in an operation.
+
+ Attributes:
+ initial_metadata: A boolean indicating whether or not to pass initial
+ metadata as part of the transmission.
+ payload: A boolean indicating whether or not to pass a payload as part of
+ the transmission.
+ complete: A boolean indicating whether or not to indicate completion of
+ transmission from the transmitting side of the operation as part of the
+ transmission.
+ """
+
+
+class Intertransmission(
+ collections.namedtuple('Intertransmission', ('invocation', 'service',))):
+ """A recipe for multiple transmissions in an operation.
+
+ Attributes:
+ invocation: An integer describing the number of payloads to send from the
+ invocation side of the operation to the service side.
+ service: An integer describing the number of payloads to send from the
+ service side of the operation to the invocation side.
+ """
+
+
+class Element(collections.namedtuple('Element', ('kind', 'transmission',))):
+ """A sum type for steps to perform when testing an operation.
+
+ Attributes:
+ kind: A Kind value describing the kind of step to perform in the test.
+ transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and
+ Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of
+ the transmission to be made.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ INVOCATION_TRANSMISSION = 'invocation transmission'
+ SERVICE_TRANSMISSION = 'service transmission'
+ INTERTRANSMISSION = 'intertransmission'
+ INVOCATION_CANCEL = 'invocation cancel'
+ SERVICE_CANCEL = 'service cancel'
+ INVOCATION_FAILURE = 'invocation failure'
+ SERVICE_FAILURE = 'service failure'
+
+
+class OutcomeKinds(
+ collections.namedtuple('Outcome', ('invocation', 'service',))):
+ """A description of the expected outcome of an operation test.
+
+ Attributes:
+ invocation: The base.Outcome.Kind value expected on the invocation side of
+ the operation.
+ service: The base.Outcome.Kind value expected on the service side of the
+ operation.
+ """
+
+
+class Sequence(
+ collections.namedtuple(
+ 'Sequence',
+ ('name', 'maximum_duration', 'invocation', 'elements',
+ 'outcome_kinds',))):
+ """Describes at a high level steps to perform in a test.
+
+ Attributes:
+ name: The string name of the sequence.
+ maximum_duration: A length of time in seconds to allow for the test before
+ declaring it to have failed.
+ invocation: An Invocation value describing how to invoke the operation
+ under test.
+ elements: A sequence of Element values describing at coarse granularity
+ actions to take during the operation under test.
+ outcome_kinds: An OutcomeKinds value describing the expected outcome kinds
+ of the test.
+ """
+
+_EASY = Sequence(
+ 'Easy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, True),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)),
+ ),
+ OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
+
+_PEASY = Sequence(
+ 'Peasy',
+ test_constants.TIME_ALLOWANCE,
+ Invocation(test_constants.LONG_TIMEOUT, True, True, False),
+ (
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)),
+ Element(
+ Element.Kind.INVOCATION_TRANSMISSION,
+ Transmission(False, True, True)),
+ Element(
+ Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)),
+ ),
+ OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED))
+
+
+# TODO(issue 2959): Finish this test suite. This tuple of sequences should
+# contain at least the values in the Cartesian product of (half-duplex,
+# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH
+# payloads) * (completion, cancellation, expiration, programming defect in
+# servicer code).
+SEQUENCES = (
+ _EASY,
+ _PEASY,
+)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_state.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_state.py
new file mode 100644
index 0000000000..21cf33aeb6
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/_state.py
@@ -0,0 +1,55 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Part of the tests of the base interface of RPC Framework."""
+
+
+class OperationState(object):
+
+ def __init__(self):
+ self.invocation_initial_metadata_in_flight = None
+ self.invocation_initial_metadata_received = False
+ self.invocation_payloads_in_flight = []
+ self.invocation_payloads_received = 0
+ self.invocation_completion_in_flight = None
+ self.invocation_completion_received = False
+ self.service_initial_metadata_in_flight = None
+ self.service_initial_metadata_received = False
+ self.service_payloads_in_flight = []
+ self.service_payloads_received = 0
+ self.service_completion_in_flight = None
+ self.service_completion_received = False
+ self.invocation_side_invocation_allowance = 1
+ self.invocation_side_service_allowance = 1
+ self.service_side_invocation_allowance = 1
+ self.service_side_service_allowance = 1
+ self.invocation_allowance_in_flight = 0
+ self.service_allowance_in_flight = 0
+ self.invocation_side_outcome = None
+ self.service_side_outcome = None
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_cases.py
new file mode 100644
index 0000000000..5d16bf98be
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_cases.py
@@ -0,0 +1,279 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of the base interface of RPC Framework."""
+
+from __future__ import division
+
+import logging
+import random
+import threading
+import time
+import unittest
+
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.base import base
+from grpc.framework.interfaces.base import utilities
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.interfaces.base import _control
+from tests.unit.framework.interfaces.base import test_interfaces
+
+_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True))
+
+_EMPTY_OUTCOME_KIND_DICT = {
+ outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
+
+
+class _Serialization(test_interfaces.Serialization):
+
+ def serialize_request(self, request):
+ return request + request
+
+ def deserialize_request(self, serialized_request):
+ return serialized_request[:len(serialized_request) // 2]
+
+ def serialize_response(self, response):
+ return response * 3
+
+ def deserialize_response(self, serialized_response):
+ return serialized_response[2 * len(serialized_response) // 3:]
+
+
+def _advance(quadruples, operator, controller):
+ try:
+ for quadruple in quadruples:
+ operator.advance(
+ initial_metadata=quadruple[0], payload=quadruple[1],
+ completion=quadruple[2], allowance=quadruple[3])
+ except Exception as e: # pylint: disable=broad-except
+ controller.failed('Exception on advance: %e' % e)
+
+
+class _Operator(base.Operator):
+
+ def __init__(self, controller, on_advance, pool, operator_under_test):
+ self._condition = threading.Condition()
+ self._controller = controller
+ self._on_advance = on_advance
+ self._pool = pool
+ self._operator_under_test = operator_under_test
+ self._pending_advances = []
+
+ def set_operator_under_test(self, operator_under_test):
+ with self._condition:
+ self._operator_under_test = operator_under_test
+ pent_advances = self._pending_advances
+ self._pending_advances = []
+ pool = self._pool
+ controller = self._controller
+
+ if pool is None:
+ _advance(pent_advances, operator_under_test, controller)
+ else:
+ pool.submit(_advance, pent_advances, operator_under_test, controller)
+
+ def advance(
+ self, initial_metadata=None, payload=None, completion=None,
+ allowance=None):
+ on_advance = self._on_advance(
+ initial_metadata, payload, completion, allowance)
+ if on_advance.kind is _control.OnAdvance.Kind.ADVANCE:
+ with self._condition:
+ pool = self._pool
+ operator_under_test = self._operator_under_test
+ controller = self._controller
+
+ quadruple = (
+ on_advance.initial_metadata, on_advance.payload,
+ on_advance.completion, on_advance.allowance)
+ if pool is None:
+ _advance((quadruple,), operator_under_test, controller)
+ else:
+ pool.submit(_advance, (quadruple,), operator_under_test, controller)
+ elif on_advance.kind is _control.OnAdvance.Kind.DEFECT:
+ raise ValueError(
+ 'Deliberately raised exception from Operator.advance (in a test)!')
+
+
+class _ProtocolReceiver(base.ProtocolReceiver):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._contexts = []
+
+ def context(self, protocol_context):
+ with self._condition:
+ self._contexts.append(protocol_context)
+
+
+class _Servicer(base.Servicer):
+ """A base.Servicer with instrumented for testing."""
+
+ def __init__(self, group, method, controllers, pool):
+ self._condition = threading.Condition()
+ self._group = group
+ self._method = method
+ self._pool = pool
+ self._controllers = list(controllers)
+
+ def service(self, group, method, context, output_operator):
+ with self._condition:
+ controller = self._controllers.pop(0)
+ if group != self._group or method != self._method:
+ controller.fail(
+ '%s != %s or %s != %s' % (group, self._group, method, self._method))
+ raise base.NoSuchMethodError(None, None)
+ else:
+ operator = _Operator(
+ controller, controller.on_service_advance, self._pool,
+ output_operator)
+ outcome = context.add_termination_callback(
+ controller.service_on_termination)
+ if outcome is not None:
+ controller.service_on_termination(outcome)
+ return utilities.full_subscription(operator, _ProtocolReceiver())
+
+
+class _OperationTest(unittest.TestCase):
+
+ def setUp(self):
+ if self._synchronicity_variation:
+ self._pool = logging_pool.pool(test_constants.POOL_SIZE)
+ else:
+ self._pool = None
+ self._controller = self._controller_creator.controller(
+ self._implementation, self._randomness)
+
+ def tearDown(self):
+ if self._synchronicity_variation:
+ self._pool.shutdown(wait=True)
+ else:
+ self._pool = None
+
+ def test_operation(self):
+ invocation = self._controller.invocation()
+ if invocation.subscription_kind is base.Subscription.Kind.FULL:
+ test_operator = _Operator(
+ self._controller, self._controller.on_invocation_advance,
+ self._pool, None)
+ subscription = utilities.full_subscription(
+ test_operator, _ProtocolReceiver())
+ else:
+ # TODO(nathaniel): support and test other subscription kinds.
+ self.fail('Non-full subscriptions not yet supported!')
+
+ servicer = _Servicer(
+ invocation.group, invocation.method, (self._controller,), self._pool)
+
+ invocation_end, service_end, memo = self._implementation.instantiate(
+ {(invocation.group, invocation.method): _Serialization()}, servicer)
+
+ try:
+ invocation_end.start()
+ service_end.start()
+ operation_context, operator_under_test = invocation_end.operate(
+ invocation.group, invocation.method, subscription, invocation.timeout,
+ initial_metadata=invocation.initial_metadata, payload=invocation.payload,
+ completion=invocation.completion)
+ test_operator.set_operator_under_test(operator_under_test)
+ outcome = operation_context.add_termination_callback(
+ self._controller.invocation_on_termination)
+ if outcome is not None:
+ self._controller.invocation_on_termination(outcome)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on invocation: %s' % e)
+ self.fail(e)
+
+ while True:
+ instruction = self._controller.poll()
+ if instruction.kind is _control.Instruction.Kind.ADVANCE:
+ try:
+ test_operator.advance(
+ *instruction.advance_args, **instruction.advance_kwargs)
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on instructed advance: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CANCEL:
+ try:
+ operation_context.cancel()
+ except Exception as e: # pylint: disable=broad-except
+ self._controller.failed('Exception on cancel: %s' % e)
+ elif instruction.kind is _control.Instruction.Kind.CONCLUDE:
+ break
+
+ invocation_stop_event = invocation_end.stop(0)
+ service_stop_event = service_end.stop(0)
+ invocation_stop_event.wait()
+ service_stop_event.wait()
+ invocation_stats = invocation_end.operation_stats()
+ service_stats = service_end.operation_stats()
+
+ self._implementation.destantiate(memo)
+
+ self.assertTrue(
+ instruction.conclude_success, msg=instruction.conclude_message)
+
+ expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+ expected_invocation_stats[
+ instruction.conclude_invocation_outcome_kind] += 1
+ self.assertDictEqual(expected_invocation_stats, invocation_stats)
+ expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT)
+ expected_service_stats[instruction.conclude_service_outcome_kind] += 1
+ self.assertDictEqual(expected_service_stats, service_stats)
+
+
+def test_cases(implementation):
+ """Creates unittest.TestCase classes for a given Base implementation.
+
+ Args:
+ implementation: A test_interfaces.Implementation specifying creation and
+ destruction of the Base implementation under test.
+
+ Returns:
+ A sequence of subclasses of unittest.TestCase defining tests of the
+ specified Base layer implementation.
+ """
+ random_seed = hash(time.time())
+ logging.warning('Random seed for this execution: %s', random_seed)
+ randomness = random.Random(x=random_seed)
+
+ test_case_classes = []
+ for synchronicity_variation in _SYNCHRONICITY_VARIATION:
+ for controller_creator in _control.CONTROLLER_CREATORS:
+ name = ''.join(
+ (synchronicity_variation[0], controller_creator.name(), 'Test',))
+ test_case_classes.append(
+ type(name, (_OperationTest,),
+ {'_implementation': implementation,
+ '_randomness': randomness,
+ '_synchronicity_variation': synchronicity_variation[1],
+ '_controller_creator': controller_creator,
+ '__module__': implementation.__module__,
+ }))
+
+ return test_case_classes
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_interfaces.py
new file mode 100644
index 0000000000..5eba475ba8
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/base/test_interfaces.py
@@ -0,0 +1,186 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Base layer."""
+
+import abc
+
+import six
+
+from grpc.framework.interfaces.base import base # pylint: disable=unused-import
+
+
+class Serialization(six.with_metaclass(abc.ABCMeta)):
+ """Specifies serialization and deserialization of test payloads."""
+
+ def serialize_request(self, request):
+ """Serializes a request value used in a test.
+
+ Args:
+ request: A request value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given request.
+ """
+ raise NotImplementedError()
+
+ def deserialize_request(self, serialized_request):
+ """Deserializes a request value used in a test.
+
+ Args:
+ serialized_request: A bytestring that is the serialization of some request
+ used in a test.
+
+ Returns:
+ The request value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+ def serialize_response(self, response):
+ """Serializes a response value used in a test.
+
+ Args:
+ response: A response value created by a test.
+
+ Returns:
+ A bytestring that is the serialization of the given response.
+ """
+ raise NotImplementedError()
+
+ def deserialize_response(self, serialized_response):
+ """Deserializes a response value used in a test.
+
+ Args:
+ serialized_response: A bytestring that is the serialization of some
+ response used in a test.
+
+ Returns:
+ The response value encoded by the given bytestring.
+ """
+ raise NotImplementedError()
+
+
+class Implementation(six.with_metaclass(abc.ABCMeta)):
+ """Specifies an implementation of the Base layer."""
+
+ @abc.abstractmethod
+ def instantiate(self, serializations, servicer):
+ """Instantiates the Base layer implementation to be used in a test.
+
+ Args:
+ serializations: A dict from group-method pair to Serialization object
+ specifying how to serialize and deserialize payload values used in the
+ test.
+ servicer: A base.Servicer object to be called to service RPCs made during
+ the test.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ base.End to be used to invoke RPCs, the second element of which is a
+ base.End to be used to service invoked RPCs, and the third element of
+ which is an arbitrary memo object to be kept and passed to destantiate
+ at the conclusion of the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Base layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of a call to
+ instantiate.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_initial_metadata(self):
+ """Provides an operation's invocation-side initial metadata.
+
+ Returns:
+ A value to use for an operation's invocation-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_initial_metadata(self):
+ """Provides an operation's service-side initial metadata.
+
+ Returns:
+ A value to use for an operation's service-side initial metadata, or
+ None.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_completion(self):
+ """Provides an operation's invocation-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's invocation-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def service_completion(self):
+ """Provides an operation's service-side completion.
+
+ Returns:
+ A base.Completion to use for an operation's service-side completion.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
+
+ Args:
+ original_metadata: A metadata value passed to the system under test.
+ transmitted_metadata: The same metadata value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the metadata was properly transmitted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def completion_transmitted(self, original_completion, transmitted_completion):
+ """Identifies whether or not a base.Completion was properly transmitted.
+
+ Args:
+ original_completion: A base.Completion passed to the system under test.
+ transmitted_completion: The same completion value after having been
+ transmitted through the system under test.
+
+ Returns:
+ Whether or not the completion was properly transmitted.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
new file mode 100644
index 0000000000..1ea356c0bf
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
@@ -0,0 +1,37 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test constant working around issue 3069."""
+
+# test_constants is referenced from specification in this module.
+from tests.unit.framework.common import test_constants # pylint: disable=unused-import
+
+# TODO(issue 3069): Replace uses of this constant with
+# test_constants.SHORT_TIMEOUT.
+REALLY_SHORT_TIMEOUT = 0.1
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
new file mode 100644
index 0000000000..e338aaa396
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -0,0 +1,295 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+from __future__ import division
+
+import abc
+import itertools
+import unittest
+from concurrent import futures
+
+import six
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.face import face
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.common import test_control
+from tests.unit.framework.common import test_coverage
+from tests.unit.framework.interfaces.face import _3069_test_constant
+from tests.unit.framework.interfaces.face import _digest
+from tests.unit.framework.interfaces.face import _stock_service
+from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+
+class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must have an "implementation" attribute of type
+ test_interfaces.Implementation and an "invoker_constructor" attribute of type
+ _invocation.InvokerConstructor.
+ """
+
+ NAME = 'BlockingInvocationInlineServiceTest'
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._control = test_control.PauseFailControl()
+ self._digest = _digest.digest(
+ _stock_service.STOCK_TEST_SERVICE, self._control, None)
+
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.inline_method_implementations, None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._invoker = None
+ self.implementation.destantiate(self._memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response, call = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT, with_call=True)
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response, call = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response = self._invoker.blocking(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response = self._invoker.blocking(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
+
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.THREAD_CONCURRENCY // 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
new file mode 100644
index 0000000000..f0befb0b27
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
@@ -0,0 +1,446 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for making a service.TestService more amenable to use in tests."""
+
+import collections
+import threading
+
+import six
+
+# test_control, _service, and test_interfaces are referenced from specification
+# in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import stream
+from grpc.framework.foundation import stream_util
+from grpc.framework.interfaces.face import face
+from tests.unit.framework.common import test_control # pylint: disable=unused-import
+from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import
+from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+_IDENTITY = lambda x: x
+
+
+class TestServiceDigest(
+ collections.namedtuple(
+ 'TestServiceDigest',
+ ('methods',
+ 'inline_method_implementations',
+ 'event_method_implementations',
+ 'multi_method_implementation',
+ 'unary_unary_messages_sequences',
+ 'unary_stream_messages_sequences',
+ 'stream_unary_messages_sequences',
+ 'stream_stream_messages_sequences',))):
+ """A transformation of a service.TestService.
+
+ Attributes:
+ methods: A dict from method group-name pair to test_interfaces.Method object
+ describing the RPC methods that may be called during the test.
+ inline_method_implementations: A dict from method group-name pair to
+ face.MethodImplementation object to be used in tests of in-line calls to
+ behaviors under test.
+ event_method_implementations: A dict from method group-name pair to
+ face.MethodImplementation object to be used in tests of event-driven calls
+ to behaviors under test.
+ multi_method_implementation: A face.MultiMethodImplementation to be used in
+ tests of generic calls to behaviors under test.
+ unary_unary_messages_sequences: A dict from method group-name pair to
+ sequence of service.UnaryUnaryTestMessages objects to be used to test the
+ identified method.
+ unary_stream_messages_sequences: A dict from method group-name pair to
+ sequence of service.UnaryStreamTestMessages objects to be used to test the
+ identified method.
+ stream_unary_messages_sequences: A dict from method group-name pair to
+ sequence of service.StreamUnaryTestMessages objects to be used to test the
+ identified method.
+ stream_stream_messages_sequences: A dict from method group-name pair to
+ sequence of service.StreamStreamTestMessages objects to be used to test
+ the identified method.
+ """
+
+
+class _BufferingConsumer(stream.Consumer):
+ """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+
+ def __init__(self):
+ self.consumed = []
+ self.terminated = False
+
+ def consume(self, value):
+ self.consumed.append(value)
+
+ def terminate(self):
+ self.terminated = True
+
+ def consume_and_terminate(self, value):
+ self.consumed.append(value)
+ self.terminated = True
+
+
+class _InlineUnaryUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, unary_unary_test_method, control):
+ self._test_method = unary_unary_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.INLINE
+
+ def unary_unary_inline(self, request, context):
+ response_list = []
+ self._test_method.service(
+ request, response_list.append, context, self._control)
+ return response_list.pop(0)
+
+
+class _EventUnaryUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, unary_unary_test_method, control, pool):
+ self._test_method = unary_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.EVENT
+
+ def unary_unary_event(self, request, response_callback, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_callback, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_callback, context,
+ self._control)
+
+
+class _InlineUnaryStreamMethod(face.MethodImplementation):
+
+ def __init__(self, unary_stream_test_method, control):
+ self._test_method = unary_stream_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.INLINE
+
+ def unary_stream_inline(self, request, context):
+ response_consumer = _BufferingConsumer()
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ for response in response_consumer.consumed:
+ yield response
+
+
+class _EventUnaryStreamMethod(face.MethodImplementation):
+
+ def __init__(self, unary_stream_test_method, control, pool):
+ self._test_method = unary_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.EVENT
+
+ def unary_stream_event(self, request, response_consumer, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_consumer, context,
+ self._control)
+
+
+class _InlineStreamUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, stream_unary_test_method, control):
+ self._test_method = stream_unary_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.INLINE
+
+ def stream_unary_inline(self, request_iterator, context):
+ response_list = []
+ request_consumer = self._test_method.service(
+ response_list.append, context, self._control)
+ for request in request_iterator:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return response_list.pop(0)
+
+
+class _EventStreamUnaryMethod(face.MethodImplementation):
+
+ def __init__(self, stream_unary_test_method, control, pool):
+ self._test_method = stream_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.EVENT
+
+ def stream_unary_event(self, response_callback, context):
+ request_consumer = self._test_method.service(
+ response_callback, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _InlineStreamStreamMethod(face.MethodImplementation):
+
+ def __init__(self, stream_stream_test_method, control):
+ self._test_method = stream_stream_test_method
+ self._control = control
+
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.INLINE
+
+ def stream_stream_inline(self, request_iterator, context):
+ response_consumer = _BufferingConsumer()
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+
+ for request in request_iterator:
+ request_consumer.consume(request)
+ while response_consumer.consumed:
+ yield response_consumer.consumed.pop(0)
+ response_consumer.terminate()
+
+
+class _EventStreamStreamMethod(face.MethodImplementation):
+
+ def __init__(self, stream_stream_test_method, control, pool):
+ self._test_method = stream_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.EVENT
+
+ def stream_stream_event(self, response_consumer, context):
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _UnaryConsumer(stream.Consumer):
+ """A Consumer that only allows consumption of exactly one value."""
+
+ def __init__(self, action):
+ self._lock = threading.Lock()
+ self._action = action
+ self._consumed = False
+ self._terminated = False
+
+ def consume(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+
+ self._action(value)
+
+ def terminate(self):
+ with self._lock:
+ if not self._consumed:
+ raise ValueError('Unary consumer hasn\'t yet consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._terminated = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+ self._terminated = True
+
+ self._action(value)
+
+
+class _UnaryUnaryAdaptation(object):
+
+ def __init__(self, unary_unary_test_method):
+ self._method = unary_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(
+ request, response_consumer.consume_and_terminate, context, control)
+ return _UnaryConsumer(action)
+
+
+class _UnaryStreamAdaptation(object):
+
+ def __init__(self, unary_stream_test_method):
+ self._method = unary_stream_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(request, response_consumer, context, control)
+ return _UnaryConsumer(action)
+
+
+class _StreamUnaryAdaptation(object):
+
+ def __init__(self, stream_unary_test_method):
+ self._method = stream_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ return self._method.service(
+ response_consumer.consume_and_terminate, context, control)
+
+
+class _MultiMethodImplementation(face.MultiMethodImplementation):
+
+ def __init__(self, methods, control, pool):
+ self._methods = methods
+ self._control = control
+ self._pool = pool
+
+ def service(self, group, name, response_consumer, context):
+ method = self._methods.get(group, name, None)
+ if method is None:
+ raise face.NoSuchMethodError(group, name)
+ elif self._pool is None:
+ return method(response_consumer, context, self._control)
+ else:
+ request_consumer = method(response_consumer, context, self._control)
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _Assembly(
+ collections.namedtuple(
+ '_Assembly',
+ ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+ """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(
+ scenarios, identifiers, inline_method_constructor, event_method_constructor,
+ adapter, control, pool):
+ """Creates an _Assembly from the given scenarios."""
+ methods = {}
+ inlines = {}
+ events = {}
+ adaptations = {}
+ messages = {}
+ for identifier, scenario in six.iteritems(scenarios):
+ if identifier in identifiers:
+ raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
+
+ test_method = scenario[0]
+ inline_method = inline_method_constructor(test_method, control)
+ event_method = event_method_constructor(test_method, control, pool)
+ adaptation = adapter(test_method)
+
+ methods[identifier] = test_method
+ inlines[identifier] = inline_method
+ events[identifier] = event_method
+ adaptations[identifier] = adaptation
+ messages[identifier] = scenario[1]
+
+ return _Assembly(methods, inlines, events, adaptations, messages)
+
+
+def digest(service, control, pool):
+ """Creates a TestServiceDigest from a TestService.
+
+ Args:
+ service: A _service.TestService.
+ control: A test_control.Control.
+ pool: If RPC methods should be serviced in a separate thread, a thread pool.
+ None if RPC methods should be serviced in the thread belonging to the
+ run-time that calls for their service.
+
+ Returns:
+ A TestServiceDigest synthesized from the given service.TestService.
+ """
+ identifiers = set()
+
+ unary_unary = _assemble(
+ service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod,
+ _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
+ identifiers.update(unary_unary.inlines)
+
+ unary_stream = _assemble(
+ service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod,
+ _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
+ identifiers.update(unary_stream.inlines)
+
+ stream_unary = _assemble(
+ service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod,
+ _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
+ identifiers.update(stream_unary.inlines)
+
+ stream_stream = _assemble(
+ service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod,
+ _EventStreamStreamMethod, _IDENTITY, control, pool)
+ identifiers.update(stream_stream.inlines)
+
+ methods = dict(unary_unary.methods)
+ methods.update(unary_stream.methods)
+ methods.update(stream_unary.methods)
+ methods.update(stream_stream.methods)
+ adaptations = dict(unary_unary.adaptations)
+ adaptations.update(unary_stream.adaptations)
+ adaptations.update(stream_unary.adaptations)
+ adaptations.update(stream_stream.adaptations)
+ inlines = dict(unary_unary.inlines)
+ inlines.update(unary_stream.inlines)
+ inlines.update(stream_unary.inlines)
+ inlines.update(stream_stream.inlines)
+ events = dict(unary_unary.events)
+ events.update(unary_stream.events)
+ events.update(stream_unary.events)
+ events.update(stream_stream.events)
+
+ return TestServiceDigest(
+ methods,
+ inlines,
+ events,
+ _MultiMethodImplementation(adaptations, control, pool),
+ unary_unary.messages,
+ unary_stream.messages,
+ stream_unary.messages,
+ stream_stream.messages)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
new file mode 100644
index 0000000000..791620307b
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -0,0 +1,480 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test code for the Face layer of RPC Framework."""
+
+from __future__ import division
+
+import abc
+import contextlib
+import itertools
+import threading
+import unittest
+from concurrent import futures
+
+import six
+
+# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
+from grpc.framework.interfaces.face import face
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.common import test_control
+from tests.unit.framework.common import test_coverage
+from tests.unit.framework.interfaces.face import _3069_test_constant
+from tests.unit.framework.interfaces.face import _digest
+from tests.unit.framework.interfaces.face import _stock_service
+from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+
+class _PauseableIterator(object):
+
+ def __init__(self, upstream):
+ self._upstream = upstream
+ self._condition = threading.Condition()
+ self._paused = False
+
+ @contextlib.contextmanager
+ def pause(self):
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ with self._condition:
+ while self._paused:
+ self._condition.wait()
+ return next(self._upstream)
+
+
+class _Callback(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._called = False
+ self._passed_future = None
+ self._passed_other_stuff = None
+
+ def __call__(self, *args, **kwargs):
+ with self._condition:
+ self._called = True
+ if args:
+ self._passed_future = args[0]
+ if 1 < len(args) or kwargs:
+ self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
+ self._condition.notify_all()
+
+ def future(self):
+ with self._condition:
+ while True:
+ if self._passed_other_stuff is not None:
+ raise ValueError(
+ 'Test callback passed unexpected values: %s',
+ self._passed_other_stuff)
+ elif self._called:
+ return self._passed_future
+ else:
+ self._condition.wait()
+
+
+class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must have an "implementation" attribute of type
+ test_interfaces.Implementation and an "invoker_constructor" attribute of type
+ _invocation.InvokerConstructor.
+ """
+
+ NAME = 'FutureInvocationAsynchronousEventServiceTest'
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._control = test_control.PauseFailControl()
+ self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
+ self._digest = _digest.digest(
+ _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool)
+
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.event_method_implementations, None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self._invoker = None
+ self.implementation.destantiate(self._memo)
+ self._digest_pool.shutdown(wait=True)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ response = response_future.result()
+
+ test_messages.verify(request, response, self)
+ self.assertIs(callback.future(), response_future)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+ callback = _Callback()
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_future = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ future_passed_to_callback = callback.future()
+ response = future_passed_to_callback.result()
+
+ test_messages.verify(requests, response, self)
+ self.assertIs(future_passed_to_callback, response_future)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ second_response = second_response_future.result()
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.THREAD_CONCURRENCY // 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertIs(callback.future(), response_future)
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertIs(callback.future(), response_future)
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(
+ group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ self.assertIs(callback.future(), response_future)
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ self.assertIs(callback.future(), response_future)
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+
+ self.assertIs(callback.future(), response_future)
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+
+ self.assertIs(callback.future(), response_future)
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(
+ response_future.exception(), face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
new file mode 100644
index 0000000000..ac487bed4f
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
@@ -0,0 +1,213 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
+
+import abc
+
+import six
+
+from grpc.framework.common import cardinality
+
+_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream',
+}
+
+_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = {
+ cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream',
+}
+
+_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
+ cardinality.Cardinality.UNARY_UNARY: 'unary_unary',
+ cardinality.Cardinality.UNARY_STREAM: 'unary_stream',
+ cardinality.Cardinality.STREAM_UNARY: 'stream_unary',
+ cardinality.Cardinality.STREAM_STREAM: 'stream_stream',
+}
+
+
+class Invoker(six.with_metaclass(abc.ABCMeta)):
+ """A type used to invoke test RPCs."""
+
+ @abc.abstractmethod
+ def blocking(self, group, name):
+ """Invokes an RPC with blocking control flow."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future(self, group, name):
+ """Invokes an RPC with future control flow."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event(self, group, name):
+ """Invokes an RPC with event control flow."""
+ raise NotImplementedError()
+
+
+class InvokerConstructor(six.with_metaclass(abc.ABCMeta)):
+ """A type used to create Invokers."""
+
+ @abc.abstractmethod
+ def name(self):
+ """Specifies the name of the Invoker constructed by this object."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ """Constructs an Invoker for the given stubs and methods."""
+ raise NotImplementedError()
+
+
+class _GenericInvoker(Invoker):
+
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
+
+ def _behavior(self, group, name, cardinality_to_generic_method):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub, cardinality_to_generic_method[method_cardinality])
+ return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
+
+ def blocking(self, group, name):
+ return self._behavior(
+ group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
+
+ def future(self, group, name):
+ return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
+
+ def event(self, group, name):
+ return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
+
+
+class _GenericInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'GenericInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _GenericInvoker(generic_stub, methods)
+
+
+class _MultiCallableInvoker(Invoker):
+
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
+
+ def _multi_callable(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ return behavior(group, name)
+
+ def blocking(self, group, name):
+ return self._multi_callable(group, name)
+
+ def future(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ if method_cardinality in (
+ cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return behavior(group, name).future
+ else:
+ return behavior(group, name)
+
+ def event(self, group, name):
+ return self._multi_callable(group, name).event
+
+
+class _MultiCallableInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'MultiCallableInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _MultiCallableInvoker(generic_stub, methods)
+
+
+class _DynamicInvoker(Invoker):
+
+ def __init__(self, dynamic_stubs, methods):
+ self._stubs = dynamic_stubs
+ self._methods = methods
+
+ def blocking(self, group, name):
+ return getattr(self._stubs[group], name)
+
+ def future(self, group, name):
+ if self._methods[group, name].cardinality() in (
+ cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return getattr(self._stubs[group], name).future
+ else:
+ return getattr(self._stubs[group], name)
+
+ def event(self, group, name):
+ return getattr(self._stubs[group], name).event
+
+
+class _DynamicInvokerConstructor(InvokerConstructor):
+
+ def name(self):
+ return 'DynamicInvoker'
+
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ return _DynamicInvoker(dynamic_stubs, methods)
+
+
+def invoker_constructors():
+ """Creates a sequence of InvokerConstructors to use in tests of RPCs.
+
+ Returns:
+ A sequence of InvokerConstructors.
+ """
+ return (
+ _GenericInvokerConstructor(),
+ _MultiCallableInvokerConstructor(),
+ _DynamicInvokerConstructor(),
+ )
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_receiver.py
new file mode 100644
index 0000000000..48f31fc677
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_receiver.py
@@ -0,0 +1,95 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A utility useful in tests of asynchronous, event-driven interfaces."""
+
+import threading
+
+from grpc.framework.interfaces.face import face
+
+
+class Receiver(face.ResponseReceiver):
+ """A utility object useful in tests of asynchronous code."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._initial_metadata = None
+ self._responses = []
+ self._terminal_metadata = None
+ self._code = None
+ self._details = None
+ self._completed = False
+ self._abortion = None
+
+ def abort(self, abortion):
+ with self._condition:
+ self._abortion = abortion
+ self._condition.notify_all()
+
+ def initial_metadata(self, initial_metadata):
+ with self._condition:
+ self._initial_metadata = initial_metadata
+
+ def response(self, response):
+ with self._condition:
+ self._responses.append(response)
+
+ def complete(self, terminal_metadata, code, details):
+ with self._condition:
+ self._terminal_metadata = terminal_metadata
+ self._code = code
+ self._details = details
+ self._completed = True
+ self._condition.notify_all()
+
+ def block_until_terminated(self):
+ with self._condition:
+ while self._abortion is None and not self._completed:
+ self._condition.wait()
+
+ def unary_response(self):
+ with self._condition:
+ if self._abortion is not None:
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
+ elif len(self._responses) != 1:
+ raise AssertionError(
+ '%d responses received, not exactly one!', len(self._responses))
+ else:
+ return self._responses[0]
+
+ def stream_responses(self):
+ with self._condition:
+ if self._abortion is None:
+ return list(self._responses)
+ else:
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
+
+ def abortion(self):
+ with self._condition:
+ return self._abortion
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
new file mode 100644
index 0000000000..f13dff0558
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
@@ -0,0 +1,316 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+import six
+
+# face is referenced from specification in this module.
+from grpc.framework.interfaces.face import face # pylint: disable=unused-import
+from tests.unit.framework.interfaces.face import test_interfaces
+
+
+class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a unary-unary method."""
+
+ @abc.abstractmethod
+ def service(self, request, response_callback, context, control):
+ """Services an RPC that accepts one message and produces one message.
+
+ Args:
+ request: The single request message for the RPC.
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
+ """A type for unary-request-unary-response message pairings."""
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, response, test_case):
+ """Verifies that the computed response matches the given request.
+
+ Args:
+ request: A request message.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a unary-stream method."""
+
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context, control):
+ """Services an RPC that takes one message and produces a stream of messages.
+
+ Args:
+ request: The single request message for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
+ """A type for unary-request-stream-response message pairings."""
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, responses, test_case):
+ """Verifies that the computed responses match the given request.
+
+ Args:
+ request: A request message.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and responses do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a stream-unary method."""
+
+ @abc.abstractmethod
+ def service(self, response_callback, context, control):
+ """Services an RPC that takes a stream of messages and produces one message.
+
+ Args:
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
+ """A type for stream-request-unary-response message pairings."""
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, response, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a stream-stream method."""
+
+ @abc.abstractmethod
+ def service(self, response_consumer, context, control):
+ """Services an RPC that accepts and produces streams of messages.
+
+ Args:
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: A face.ServicerContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
+ """A type for stream-request-stream-response message pairings."""
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, responses, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and responses do not match, indicating
+ that there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class TestService(six.with_metaclass(abc.ABCMeta)):
+ """A specification of implemented methods to use in tests."""
+
+ @abc.abstractmethod
+ def unary_unary_scenarios(self):
+ """Affords unary-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a UnaryUnaryTestMethodImplementation object
+ and the second element is a sequence of UnaryUnaryTestMethodMessages
+ objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream_scenarios(self):
+ """Affords unary-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a UnaryStreamTestMethodImplementation
+ object and the second element is a sequence of
+ UnaryStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary_scenarios(self):
+ """Affords stream-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a StreamUnaryTestMethodImplementation
+ object and the second element is a sequence of
+ StreamUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream_scenarios(self):
+ """Affords stream-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method group-name pair to implementation/messages pair. The
+ first element of the pair is a StreamStreamTestMethodImplementation
+ object and the second element is a sequence of
+ StreamStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
new file mode 100644
index 0000000000..5299655bb3
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
@@ -0,0 +1,396 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Examples of Python implementations of the stock.proto Stock service."""
+
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import stream
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.interfaces.face import _service
+from tests.unit._junkdrawer import stock_pb2
+
+_STOCK_GROUP_NAME = 'Stock'
+_SYMBOL_FORMAT = 'test symbol:%03d'
+
+# A test-appropriate security-pricing function. :-P
+_price = lambda symbol_name: float(hash(symbol_name) % 4096)
+
+
+def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
+ """A unary-request, unary-response test method."""
+ control.control()
+ if active():
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol)))
+ else:
+ raise abandonment.Abandoned()
+
+
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+ """A stream-request, stream-response test method."""
+ def stock_reply_for_stock_request(stock_request):
+ control.control()
+ if active():
+ return stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ else:
+ raise abandonment.Abandoned()
+
+ class StockRequestConsumer(stream.Consumer):
+
+ def consume(self, stock_request):
+ stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request))
+
+ def terminate(self):
+ control.control()
+ stock_reply_consumer.terminate()
+
+ def consume_and_terminate(self, stock_request):
+ stock_reply_consumer.consume_and_terminate(
+ stock_reply_for_stock_request(stock_request))
+
+ return StockRequestConsumer()
+
+
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+ """A unary-request, stream-response test method."""
+ base_price = _price(stock_request.symbol)
+ for index in range(stock_request.num_trades_to_watch):
+ control.control()
+ if active():
+ stock_reply_consumer.consume(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=base_price + index))
+ else:
+ raise abandonment.Abandoned()
+ stock_reply_consumer.terminate()
+
+
+def _get_highest_trade_price(stock_reply_callback, control, active):
+ """A stream-request, unary-response test method."""
+
+ class StockRequestConsumer(stream.Consumer):
+ """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+ def __init__(self):
+ self._symbol = None
+ self._price = None
+
+ def consume(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ self._symbol = stock_request.symbol
+ self._price = _price(stock_request.symbol)
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ self._symbol = stock_request.symbol
+ self._price = candidate_price
+
+ def terminate(self):
+ control.control()
+ if active():
+ if self._symbol is None:
+ raise ValueError()
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(symbol=self._symbol, price=self._price))
+ self._symbol = None
+ self._price = None
+
+ def consume_and_terminate(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=_price(stock_request.symbol)))
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=candidate_price))
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+
+ self._symbol = None
+ self._price = None
+
+ return StockRequestConsumer()
+
+
+class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
+ """GetLastTradePrice for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetLastTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_callback, context, control):
+ _get_last_trade_price(
+ request, response_callback, control, context.is_active)
+
+
+class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(symbol=symbol)
+
+ def verify(self, request, response, test_case):
+ test_case.assertEqual(request.symbol, response.symbol)
+ test_case.assertEqual(_price(request.symbol), response.price)
+
+
+class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
+ """GetLastTradePriceMultiple for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetLastTradePriceMultiple'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_consumer, context, control):
+ return _get_last_trade_price_multiple(
+ response_consumer, control, context.is_active)
+
+
+class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
+ """Pairs of message streams for use with GetLastTradePriceMultiple."""
+
+ def __init__(self):
+ self._index = 0
+
+ def requests(self):
+ base_index = self._index
+ self._index += 1
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
+ for index in range(test_constants.STREAM_LENGTH)]
+
+ def verify(self, requests, responses, test_case):
+ test_case.assertEqual(len(requests), len(responses))
+ for stock_request, stock_reply in zip(requests, responses):
+ test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+ test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+
+
+class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
+ """WatchFutureTrades for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'WatchFutureTrades'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_consumer, context, control):
+ _watch_future_trades(request, response_consumer, control, context.is_active)
+
+
+class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
+ """Pairs of a single request message and a sequence of response messages."""
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(
+ symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
+
+ def verify(self, request, responses, test_case):
+ test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
+ base_price = _price(request.symbol)
+ for index, response in enumerate(responses):
+ test_case.assertEqual(base_price + index, response.price)
+
+
+class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
+ """GetHighestTradePrice for use in tests."""
+
+ def group(self):
+ return _STOCK_GROUP_NAME
+
+ def name(self):
+ return 'GetHighestTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_callback, context, control):
+ return _get_highest_trade_price(
+ response_callback, control, context.is_active)
+
+
+class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
+
+ def requests(self):
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
+ for index in range(test_constants.STREAM_LENGTH)]
+
+ def verify(self, requests, response, test_case):
+ price = None
+ symbol = None
+ for stock_request in requests:
+ current_symbol = stock_request.symbol
+ current_price = _price(current_symbol)
+ if price is None or price < current_price:
+ price = current_price
+ symbol = current_symbol
+ test_case.assertEqual(price, response.price)
+ test_case.assertEqual(symbol, response.symbol)
+
+
+class StockTestService(_service.TestService):
+ """A corpus of test data with one method of each RPC cardinality."""
+
+ def unary_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePrice'): (
+ GetLastTradePrice(), [GetLastTradePriceMessages()]),
+ }
+
+ def unary_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'WatchFutureTrades'): (
+ WatchFutureTrades(), [WatchFutureTradesMessages()]),
+ }
+
+ def stream_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): (
+ GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+ }
+
+ def stream_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): (
+ GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
+ }
+
+
+STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
new file mode 100644
index 0000000000..71de9d835e
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
@@ -0,0 +1,67 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tools for creating tests of implementations of the Face layer."""
+
+# unittest is referenced from specification in this module.
+import unittest # pylint: disable=unused-import
+
+# test_interfaces is referenced from specification in this module.
+from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service
+from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service
+from tests.unit.framework.interfaces.face import _invocation
+from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
+
+_TEST_CASE_SUPERCLASSES = (
+ _blocking_invocation_inline_service.TestCase,
+ _future_invocation_asynchronous_event_service.TestCase,
+)
+
+
+def test_cases(implementation):
+ """Creates unittest.TestCase classes for a given Face layer implementation.
+
+ Args:
+ implementation: A test_interfaces.Implementation specifying creation and
+ destruction of a given Face layer implementation.
+
+ Returns:
+ A sequence of subclasses of unittest.TestCase defining tests of the
+ specified Face layer implementation.
+ """
+ test_case_classes = []
+ for invoker_constructor in _invocation.invoker_constructors():
+ for super_class in _TEST_CASE_SUPERCLASSES:
+ test_case_classes.append(
+ type(invoker_constructor.name() + super_class.NAME, (super_class,),
+ {'implementation': implementation,
+ 'invoker_constructor': invoker_constructor,
+ '__module__': implementation.__module__,
+ }))
+ return test_case_classes
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
new file mode 100644
index 0000000000..40f38e68ba
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
@@ -0,0 +1,229 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces used in tests of implementations of the Face layer."""
+
+import abc
+
+import six
+
+from grpc.framework.common import cardinality # pylint: disable=unused-import
+from grpc.framework.interfaces.face import face # pylint: disable=unused-import
+
+
+class Method(six.with_metaclass(abc.ABCMeta)):
+ """Specifies a method to be used in tests."""
+
+ @abc.abstractmethod
+ def group(self):
+ """Identify the group of the method.
+
+ Returns:
+ The group of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def name(self):
+ """Identify the name of the method.
+
+ Returns:
+ The name of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cardinality(self):
+ """Identify the cardinality of the method.
+
+ Returns:
+ A cardinality.Cardinality value describing the streaming semantics of the
+ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def request_class(self):
+ """Identify the class used for the method's request objects.
+
+ Returns:
+ The class object of the class to which the method's request objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response_class(self):
+ """Identify the class used for the method's response objects.
+
+ Returns:
+ The class object of the class to which the method's response objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize the given request object.
+
+ Args:
+ request: A request object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """Synthesize a request object from a given bytestring.
+
+ Args:
+ serialized_request: A bytestring deserializable into a request object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize the given response object.
+
+ Args:
+ response: A response object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """Synthesize a response object from a given bytestring.
+
+ Args:
+ serialized_response: A bytestring deserializable into a response object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+
+class Implementation(six.with_metaclass(abc.ABCMeta)):
+ """Specifies an implementation of the Face layer."""
+
+ @abc.abstractmethod
+ def instantiate(
+ self, methods, method_implementations,
+ multi_method_implementation):
+ """Instantiates the Face layer implementation to be used in a test.
+
+ Args:
+ methods: A sequence of Method objects describing the methods available to
+ be called during the test.
+ method_implementations: A dictionary from group-name pair to
+ face.MethodImplementation object specifying implementation of a method.
+ multi_method_implementation: A face.MultiMethodImplementation or None.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ face.GenericStub, the second element of which is dictionary from groups
+ to face.DynamicStubs affording invocation of the group's methods, and
+ the third element of which is an arbitrary memo object to be kept and
+ passed to destantiate at the conclusion of the test. The returned stubs
+ must be backed by the provided implementations.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Face layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of a call to
+ instantiate.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invocation_metadata(self):
+ """Provides the metadata to be used when invoking a test RPC.
+
+ Returns:
+ An object to use as the supplied-at-invocation-time metadata in a test
+ RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Provides the metadata for use as a test RPC's first servicer metadata.
+
+ Returns:
+ An object to use as the from-the-servicer-before-responses metadata in a
+ test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminal_metadata(self):
+ """Provides the metadata for use as a test RPC's second servicer metadata.
+
+ Returns:
+ An object to use as the from-the-servicer-after-all-responses metadata in
+ a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def code(self):
+ """Provides the value for use as a test RPC's code.
+
+ Returns:
+ An object to use as the from-the-servicer code in a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def details(self):
+ """Provides the value for use as a test RPC's details.
+
+ Returns:
+ An object to use as the from-the-servicer details in a test RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
+
+ Args:
+ original_metadata: A metadata value passed to the Face interface
+ implementation under test.
+ transmitted_metadata: The same metadata value after having been
+ transmitted via an RPC performed by the Face interface implementation
+ under test.
+
+ Returns:
+ Whether or not the metadata was properly transmitted by the Face interface
+ implementation under test.
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/links/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_cases.py
new file mode 100644
index 0000000000..608e64119e
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_cases.py
@@ -0,0 +1,327 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of the links interface of RPC Framework."""
+
+# unittest is referenced from specification in this module.
+import abc
+import unittest # pylint: disable=unused-import
+
+import six
+
+from grpc.framework.interfaces.links import links
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.interfaces.links import test_utilities
+
+
+def at_least_n_payloads_received_predicate(n):
+ def predicate(ticket_sequence):
+ payload_count = 0
+ for ticket in ticket_sequence:
+ if ticket.payload is not None:
+ payload_count += 1
+ if n <= payload_count:
+ return True
+ else:
+ return False
+ return predicate
+
+
+def terminated(ticket_sequence):
+ return ticket_sequence and ticket_sequence[-1].termination is not None
+
+_TRANSMISSION_GROUP = 'test.Group'
+_TRANSMISSION_METHOD = 'TestMethod'
+
+
+class TransmissionTest(six.with_metaclass(abc.ABCMeta)):
+ """Tests ticket transmission between two connected links.
+
+ This class must be mixed into a unittest.TestCase that implements the abstract
+ methods it provides.
+ """
+
+ # This is a unittest.TestCase mix-in.
+ # pylint: disable=invalid-name
+
+ @abc.abstractmethod
+ def create_transmitting_links(self):
+ """Creates two connected links for use in this test.
+
+ Returns:
+ Two links.Links, the first of which will be used on the invocation side
+ of RPCs and the second of which will be used on the service side of
+ RPCs.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def destroy_transmitting_links(self, invocation_side_link, service_side_link):
+ """Destroys the two connected links created for this test.
+
+
+ Args:
+ invocation_side_link: The link used on the invocation side of RPCs in
+ this test.
+ service_side_link: The link used on the service side of RPCs in this
+ test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_invocation_initial_metadata(self):
+ """Creates a value for use as invocation-side initial metadata.
+
+ Returns:
+ A metadata value appropriate for use as invocation-side initial metadata
+ or None if invocation-side initial metadata transmission is not
+ supported by the links under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_invocation_terminal_metadata(self):
+ """Creates a value for use as invocation-side terminal metadata.
+
+ Returns:
+ A metadata value appropriate for use as invocation-side terminal
+ metadata or None if invocation-side terminal metadata transmission is
+ not supported by the links under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_service_initial_metadata(self):
+ """Creates a value for use as service-side initial metadata.
+
+ Returns:
+ A metadata value appropriate for use as service-side initial metadata or
+ None if service-side initial metadata transmission is not supported by
+ the links under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_service_terminal_metadata(self):
+ """Creates a value for use as service-side terminal metadata.
+
+ Returns:
+ A metadata value appropriate for use as service-side terminal metadata or
+ None if service-side terminal metadata transmission is not supported by
+ the links under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_invocation_completion(self):
+ """Creates values for use as invocation-side code and message.
+
+ Returns:
+ An invocation-side code value and an invocation-side message value.
+ Either or both may be None if invocation-side code and/or
+ invocation-side message transmission is not supported by the links
+ under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_service_completion(self):
+ """Creates values for use as service-side code and message.
+
+ Returns:
+ A service-side code value and a service-side message value. Either or
+ both may be None if service-side code and/or service-side message
+ transmission is not supported by the links under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def assertMetadataTransmitted(self, original_metadata, transmitted_metadata):
+ """Asserts that transmitted_metadata contains original_metadata.
+
+ Args:
+ original_metadata: A metadata object used in this test.
+ transmitted_metadata: A metadata object obtained after transmission
+ through the system under test.
+
+ Raises:
+ AssertionError: if the transmitted_metadata object does not contain
+ original_metadata.
+ """
+ raise NotImplementedError()
+
+ def group_and_method(self):
+ """Returns the group and method used in this test case.
+
+ Returns:
+ A pair of the group and method used in this test case.
+ """
+ return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD
+
+ def serialize_request(self, request):
+ """Serializes a request value used in this test case.
+
+ Args:
+ request: A request value created by this test case.
+
+ Returns:
+ A bytestring that is the serialization of the given request.
+ """
+ return request
+
+ def deserialize_request(self, serialized_request):
+ """Deserializes a request value used in this test case.
+
+ Args:
+ serialized_request: A bytestring that is the serialization of some request
+ used in this test case.
+
+ Returns:
+ The request value encoded by the given bytestring.
+ """
+ return serialized_request
+
+ def serialize_response(self, response):
+ """Serializes a response value used in this test case.
+
+ Args:
+ response: A response value created by this test case.
+
+ Returns:
+ A bytestring that is the serialization of the given response.
+ """
+ return response
+
+ def deserialize_response(self, serialized_response):
+ """Deserializes a response value used in this test case.
+
+ Args:
+ serialized_response: A bytestring that is the serialization of some
+ response used in this test case.
+
+ Returns:
+ The response value encoded by the given bytestring.
+ """
+ return serialized_response
+
+ def _assert_is_valid_metadata_payload_sequence(
+ self, ticket_sequence, payloads, initial_metadata, terminal_metadata):
+ initial_metadata_seen = False
+ seen_payloads = []
+ terminal_metadata_seen = False
+
+ for ticket in ticket_sequence:
+ if ticket.initial_metadata is not None:
+ self.assertFalse(initial_metadata_seen)
+ self.assertFalse(seen_payloads)
+ self.assertFalse(terminal_metadata_seen)
+ self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata)
+ initial_metadata_seen = True
+
+ if ticket.payload is not None:
+ self.assertFalse(terminal_metadata_seen)
+ seen_payloads.append(ticket.payload)
+
+ if ticket.terminal_metadata is not None:
+ self.assertFalse(terminal_metadata_seen)
+ self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata)
+ terminal_metadata_seen = True
+ self.assertSequenceEqual(payloads, seen_payloads)
+
+ def _assert_is_valid_invocation_sequence(
+ self, ticket_sequence, group, method, payloads, initial_metadata,
+ terminal_metadata, termination):
+ self.assertLess(0, len(ticket_sequence))
+ self.assertEqual(group, ticket_sequence[0].group)
+ self.assertEqual(method, ticket_sequence[0].method)
+ self._assert_is_valid_metadata_payload_sequence(
+ ticket_sequence, payloads, initial_metadata, terminal_metadata)
+ self.assertIs(termination, ticket_sequence[-1].termination)
+
+ def _assert_is_valid_service_sequence(
+ self, ticket_sequence, payloads, initial_metadata, terminal_metadata,
+ code, message, termination):
+ self.assertLess(0, len(ticket_sequence))
+ self._assert_is_valid_metadata_payload_sequence(
+ ticket_sequence, payloads, initial_metadata, terminal_metadata)
+ self.assertEqual(code, ticket_sequence[-1].code)
+ self.assertEqual(message, ticket_sequence[-1].message)
+ self.assertIs(termination, ticket_sequence[-1].termination)
+
+ def setUp(self):
+ self._invocation_link, self._service_link = self.create_transmitting_links()
+ self._invocation_mate = test_utilities.RecordingLink()
+ self._service_mate = test_utilities.RecordingLink()
+ self._invocation_link.join_link(self._invocation_mate)
+ self._service_link.join_link(self._service_mate)
+
+ def tearDown(self):
+ self.destroy_transmitting_links(self._invocation_link, self._service_link)
+
+ def testSimplestRoundTrip(self):
+ """Tests transmission of one ticket in each direction."""
+ invocation_operation_id = object()
+ invocation_payload = b'\x07' * 1023
+ timeout = test_constants.LONG_TIMEOUT
+ invocation_initial_metadata = self.create_invocation_initial_metadata()
+ invocation_terminal_metadata = self.create_invocation_terminal_metadata()
+ invocation_code, invocation_message = self.create_invocation_completion()
+ service_payload = b'\x08' * 1025
+ service_initial_metadata = self.create_service_initial_metadata()
+ service_terminal_metadata = self.create_service_terminal_metadata()
+ service_code, service_message = self.create_service_completion()
+
+ original_invocation_ticket = links.Ticket(
+ invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
+ links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata,
+ invocation_payload, invocation_terminal_metadata, invocation_code,
+ invocation_message, links.Ticket.Termination.COMPLETION, None)
+ self._invocation_link.accept_ticket(original_invocation_ticket)
+
+ self._service_mate.block_until_tickets_satisfy(
+ at_least_n_payloads_received_predicate(1))
+ service_operation_id = self._service_mate.tickets()[0].operation_id
+
+ self._service_mate.block_until_tickets_satisfy(terminated)
+ self._assert_is_valid_invocation_sequence(
+ self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD,
+ (invocation_payload,), invocation_initial_metadata,
+ invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
+
+ original_service_ticket = links.Ticket(
+ service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
+ timeout, 0, service_initial_metadata, service_payload,
+ service_terminal_metadata, service_code, service_message,
+ links.Ticket.Termination.COMPLETION, None)
+ self._service_link.accept_ticket(original_service_ticket)
+ self._invocation_mate.block_until_tickets_satisfy(terminated)
+ self._assert_is_valid_service_sequence(
+ self._invocation_mate.tickets(), (service_payload,),
+ service_initial_metadata, service_terminal_metadata, service_code,
+ service_message, links.Ticket.Termination.COMPLETION)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_utilities.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_utilities.py
new file mode 100644
index 0000000000..39c7f2fc63
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/links/test_utilities.py
@@ -0,0 +1,167 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior appropriate for use in tests."""
+
+import logging
+import threading
+import time
+
+from grpc.framework.interfaces.links import links
+from grpc.framework.interfaces.links import utilities
+
+# A more-or-less arbitrary limit on the length of raw data values to be logged.
+_UNCOMFORTABLY_LONG = 48
+
+
+def _safe_for_log_ticket(ticket):
+ """Creates a safe-for-printing-to-the-log ticket for a given ticket.
+
+ Args:
+ ticket: Any links.Ticket.
+
+ Returns:
+ A links.Ticket that is as much as can be equal to the given ticket but
+ possibly features values like the string "<payload of length 972321>" in
+ place of the actual values of the given ticket.
+ """
+ if isinstance(ticket.payload, (basestring,)):
+ payload_length = len(ticket.payload)
+ else:
+ payload_length = -1
+ if payload_length < _UNCOMFORTABLY_LONG:
+ return ticket
+ else:
+ return links.Ticket(
+ ticket.operation_id, ticket.sequence_number,
+ ticket.group, ticket.method, ticket.subscription, ticket.timeout,
+ ticket.allowance, ticket.initial_metadata,
+ '<payload of length {}>'.format(payload_length),
+ ticket.terminal_metadata, ticket.code, ticket.message,
+ ticket.termination, None)
+
+
+class RecordingLink(links.Link):
+ """A Link that records every ticket passed to it."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._tickets = []
+
+ def accept_ticket(self, ticket):
+ with self._condition:
+ self._tickets.append(ticket)
+ self._condition.notify_all()
+
+ def join_link(self, link):
+ pass
+
+ def block_until_tickets_satisfy(self, predicate):
+ """Blocks until the received tickets satisfy the given predicate.
+
+ Args:
+ predicate: A callable that takes a sequence of tickets and returns a
+ boolean value.
+ """
+ with self._condition:
+ while not predicate(self._tickets):
+ self._condition.wait()
+
+ def tickets(self):
+ """Returns a copy of the list of all tickets received by this Link."""
+ with self._condition:
+ return tuple(self._tickets)
+
+
+class _Pipe(object):
+ """A conduit that logs all tickets passed through it."""
+
+ def __init__(self, name):
+ self._lock = threading.Lock()
+ self._name = name
+ self._left_mate = utilities.NULL_LINK
+ self._right_mate = utilities.NULL_LINK
+
+ def accept_left_to_right_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving left to right through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._right_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def accept_right_to_left_ticket(self, ticket):
+ with self._lock:
+ logging.warning(
+ '%s: moving right to left through %s: %s', time.time(), self._name,
+ _safe_for_log_ticket(ticket))
+ try:
+ self._left_mate.accept_ticket(ticket)
+ except Exception as e: # pylint: disable=broad-except
+ logging.exception(e)
+
+ def join_left_mate(self, left_mate):
+ with self._lock:
+ self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate
+
+ def join_right_mate(self, right_mate):
+ with self._lock:
+ self._right_mate = (
+ utilities.NULL_LINK if right_mate is None else right_mate)
+
+
+class _Facade(links.Link):
+
+ def __init__(self, accept, join):
+ self._accept = accept
+ self._join = join
+
+ def accept_ticket(self, ticket):
+ self._accept(ticket)
+
+ def join_link(self, link):
+ self._join(link)
+
+
+def logging_links(name):
+ """Creates a conduit that logs all tickets passed through it.
+
+ Args:
+ name: A name to use for the conduit to identify itself in logging output.
+
+ Returns:
+ Two links.Links, the first of which is the "left" side of the conduit
+ and the second of which is the "right" side of the conduit.
+ """
+ pipe = _Pipe(name)
+ left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate)
+ right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate)
+ return left_facade, right_facade