aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/_framework/face
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-01-23 21:48:42 -0800
committerGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2015-01-23 21:48:42 -0800
commite2380338bfc9cec2f44877b3551d4e94490c1daa (patch)
tree52a464f9d3b64fe756098c5368f3ae426e24e466 /src/python/_framework/face
parentcacae338b6cf32b8deedc26141990da78bd03034 (diff)
parent7a3c38bed9ea9e69ec748b9ed08cdf16cb6ac501 (diff)
Merge pull request #193 from nathanielmanistaatgoogle/python-introduction
Bring the rest of Python RPC Framework into GRPC.
Diffstat (limited to 'src/python/_framework/face')
-rw-r--r--src/python/_framework/face/__init__.py0
-rw-r--r--src/python/_framework/face/_calls.py310
-rw-r--r--src/python/_framework/face/_control.py194
-rw-r--r--src/python/_framework/face/_service.py189
-rw-r--r--src/python/_framework/face/_test_case.py81
-rw-r--r--src/python/_framework/face/blocking_invocation_inline_service_test.py46
-rw-r--r--src/python/_framework/face/demonstration.py118
-rw-r--r--src/python/_framework/face/event_invocation_synchronous_event_service_test.py46
-rw-r--r--src/python/_framework/face/exceptions.py77
-rw-r--r--src/python/_framework/face/future_invocation_asynchronous_event_service_test.py46
-rw-r--r--src/python/_framework/face/implementations.py246
-rw-r--r--src/python/_framework/face/interfaces.py545
-rw-r--r--src/python/_framework/face/testing/__init__.py0
-rw-r--r--src/python/_framework/face/testing/base_util.py102
-rw-r--r--src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py223
-rw-r--r--src/python/_framework/face/testing/callback.py94
-rw-r--r--src/python/_framework/face/testing/control.py87
-rw-r--r--src/python/_framework/face/testing/coverage.py123
-rw-r--r--src/python/_framework/face/testing/digest.py446
-rw-r--r--src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py367
-rw-r--r--src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py377
-rw-r--r--src/python/_framework/face/testing/interfaces.py117
-rw-r--r--src/python/_framework/face/testing/serial.py70
-rw-r--r--src/python/_framework/face/testing/service.py337
-rw-r--r--src/python/_framework/face/testing/stock_service.py374
-rw-r--r--src/python/_framework/face/testing/test_case.py111
26 files changed, 4726 insertions, 0 deletions
diff --git a/src/python/_framework/face/__init__.py b/src/python/_framework/face/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/src/python/_framework/face/__init__.py
diff --git a/src/python/_framework/face/_calls.py b/src/python/_framework/face/_calls.py
new file mode 100644
index 0000000000..ab58e6378b
--- /dev/null
+++ b/src/python/_framework/face/_calls.py
@@ -0,0 +1,310 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility functions for invoking RPCs."""
+
+import threading
+
+from _framework.base import interfaces as base_interfaces
+from _framework.base import util as base_util
+from _framework.face import _control
+from _framework.face import interfaces
+from _framework.foundation import callable_util
+from _framework.foundation import future
+
+_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
+_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
+
+
+class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor):
+
+ def __init__(self, rendezvous):
+ self._rendezvous = rendezvous
+
+ def consumer(self, operation_context):
+ return self._rendezvous
+
+
+class _EventServicedIngestor(base_interfaces.ServicedIngestor):
+
+ def __init__(self, result_consumer, abortion_callback):
+ self._result_consumer = result_consumer
+ self._abortion_callback = abortion_callback
+
+ def consumer(self, operation_context):
+ operation_context.add_termination_callback(
+ _control.as_operation_termination_callback(self._abortion_callback))
+ return self._result_consumer
+
+
+def _rendezvous_subscription(rendezvous):
+ return base_util.full_serviced_subscription(
+ _RendezvousServicedIngestor(rendezvous))
+
+
+def _unary_event_subscription(completion_callback, abortion_callback):
+ return base_util.full_serviced_subscription(
+ _EventServicedIngestor(
+ _control.UnaryConsumer(completion_callback), abortion_callback))
+
+
+def _stream_event_subscription(result_consumer, abortion_callback):
+ return base_util.full_serviced_subscription(
+ _EventServicedIngestor(result_consumer, abortion_callback))
+
+
+class _OperationCancellableIterator(interfaces.CancellableIterator):
+ """An interfaces.CancellableIterator for response-streaming operations."""
+
+ def __init__(self, rendezvous, operation):
+ self._rendezvous = rendezvous
+ self._operation = operation
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ return next(self._rendezvous)
+
+ def cancel(self):
+ self._operation.cancel()
+ self._rendezvous.set_outcome(base_interfaces.CANCELLED)
+
+
+class _OperationFuture(future.Future):
+ """A future.Future interface to an operation."""
+
+ def __init__(self, rendezvous, operation):
+ self._condition = threading.Condition()
+ self._rendezvous = rendezvous
+ self._operation = operation
+
+ self._outcome = None
+ self._callbacks = []
+
+ def cancel(self):
+ """See future.Future.cancel for specification."""
+ with self._condition:
+ if self._outcome is None:
+ self._operation.cancel()
+ self._outcome = future.aborted()
+ self._condition.notify_all()
+ return False
+
+ def cancelled(self):
+ """See future.Future.cancelled for specification."""
+ return False
+
+ def done(self):
+ """See future.Future.done for specification."""
+ with self._condition:
+ return (self._outcome is not None and
+ self._outcome.category is not future.ABORTED)
+
+ def outcome(self):
+ """See future.Future.outcome for specification."""
+ with self._condition:
+ while self._outcome is None:
+ self._condition.wait()
+ return self._outcome
+
+ def add_done_callback(self, callback):
+ """See future.Future.add_done_callback for specification."""
+ with self._condition:
+ if self._callbacks is not None:
+ self._callbacks.add(callback)
+ return
+
+ outcome = self._outcome
+
+ callable_util.call_logging_exceptions(
+ callback, _DONE_CALLBACK_LOG_MESSAGE, outcome)
+
+ def on_operation_termination(self, operation_outcome):
+ """Indicates to this object that the operation has terminated.
+
+ Args:
+ operation_outcome: One of base_interfaces.COMPLETED,
+ base_interfaces.CANCELLED, base_interfaces.EXPIRED,
+ base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
+ base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE
+ indicating the categorical outcome of the operation.
+ """
+ with self._condition:
+ if (self._outcome is None and
+ operation_outcome != base_interfaces.COMPLETED):
+ self._outcome = future.raised(
+ _control.abortion_outcome_to_exception(operation_outcome))
+ self._condition.notify_all()
+
+ outcome = self._outcome
+ rendezvous = self._rendezvous
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+
+ if outcome is None:
+ try:
+ return_value = next(rendezvous)
+ except Exception as e: # pylint: disable=broad-except
+ outcome = future.raised(e)
+ else:
+ outcome = future.returned(return_value)
+ with self._condition:
+ if self._outcome is None:
+ self._outcome = outcome
+ self._condition.notify_all()
+ else:
+ outcome = self._outcome
+
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(
+ callback, _DONE_CALLBACK_LOG_MESSAGE, outcome)
+
+
+class _Call(interfaces.Call):
+
+ def __init__(self, operation):
+ self._operation = operation
+ self.context = _control.RpcContext(operation.context)
+
+ def cancel(self):
+ self._operation.cancel()
+
+
+def blocking_value_in_value_out(front, name, payload, timeout, trace_id):
+ """Services in a blocking fashion a value-in value-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ return next(rendezvous)
+
+
+def future_value_in_value_out(front, name, payload, timeout, trace_id):
+ """Services a value-in value-out servicer method by returning a Future."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ operation_future = _OperationFuture(rendezvous, operation)
+ operation.context.add_termination_callback(
+ operation_future.on_operation_termination)
+ return operation_future
+
+
+def inline_value_in_stream_out(front, name, payload, timeout, trace_id):
+ """Services a value-in stream-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ return _OperationCancellableIterator(rendezvous, operation)
+
+
+def blocking_stream_in_value_out(
+ front, name, payload_iterator, timeout, trace_id):
+ """Services in a blocking fashion a stream-in value-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ for payload in payload_iterator:
+ operation.consumer.consume(payload)
+ operation.consumer.terminate()
+ return next(rendezvous)
+
+
+def future_stream_in_value_out(
+ front, name, payload_iterator, timeout, trace_id, pool):
+ """Services a stream-in value-out servicer method by returning a Future."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ pool.submit(
+ callable_util.with_exceptions_logged(
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
+ payload_iterator, operation.consumer, lambda: True, True)
+ operation_future = _OperationFuture(rendezvous, operation)
+ operation.context.add_termination_callback(
+ operation_future.on_operation_termination)
+ return operation_future
+
+
+def inline_stream_in_stream_out(
+ front, name, payload_iterator, timeout, trace_id, pool):
+ """Services a stream-in stream-out servicer method."""
+ rendezvous = _control.Rendezvous()
+ subscription = _rendezvous_subscription(rendezvous)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ operation.context.add_termination_callback(rendezvous.set_outcome)
+ pool.submit(
+ callable_util.with_exceptions_logged(
+ _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE),
+ payload_iterator, operation.consumer, lambda: True, True)
+ return _OperationCancellableIterator(rendezvous, operation)
+
+
+def event_value_in_value_out(
+ front, name, payload, completion_callback, abortion_callback, timeout,
+ trace_id):
+ subscription = _unary_event_subscription(
+ completion_callback, abortion_callback)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ return _Call(operation)
+
+
+def event_value_in_stream_out(
+ front, name, payload, result_payload_consumer, abortion_callback, timeout,
+ trace_id):
+ subscription = _stream_event_subscription(
+ result_payload_consumer, abortion_callback)
+ operation = front.operate(
+ name, payload, True, timeout, subscription, trace_id)
+ return _Call(operation)
+
+
+def event_stream_in_value_out(
+ front, name, completion_callback, abortion_callback, timeout, trace_id):
+ subscription = _unary_event_subscription(
+ completion_callback, abortion_callback)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ return _Call(operation), operation.consumer
+
+
+def event_stream_in_stream_out(
+ front, name, result_payload_consumer, abortion_callback, timeout, trace_id):
+ subscription = _stream_event_subscription(
+ result_payload_consumer, abortion_callback)
+ operation = front.operate(name, None, False, timeout, subscription, trace_id)
+ return _Call(operation), operation.consumer
diff --git a/src/python/_framework/face/_control.py b/src/python/_framework/face/_control.py
new file mode 100644
index 0000000000..2c221321d6
--- /dev/null
+++ b/src/python/_framework/face/_control.py
@@ -0,0 +1,194 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for translating between sync and async control flow."""
+
+import threading
+
+from _framework.base import interfaces as base_interfaces
+from _framework.face import exceptions
+from _framework.face import interfaces
+from _framework.foundation import abandonment
+from _framework.foundation import stream
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
+
+_OPERATION_OUTCOME_TO_RPC_ABORTION = {
+ base_interfaces.CANCELLED: interfaces.CANCELLED,
+ base_interfaces.EXPIRED: interfaces.EXPIRED,
+ base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE,
+ base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE,
+ base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
+ base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
+ }
+
+
+def _as_operation_termination_callback(rpc_abortion_callback):
+ def operation_termination_callback(operation_outcome):
+ rpc_abortion = _OPERATION_OUTCOME_TO_RPC_ABORTION.get(
+ operation_outcome, None)
+ if rpc_abortion is not None:
+ rpc_abortion_callback(rpc_abortion)
+ return operation_termination_callback
+
+
+def _abortion_outcome_to_exception(abortion_outcome):
+ if abortion_outcome == base_interfaces.CANCELLED:
+ return exceptions.CancellationError()
+ elif abortion_outcome == base_interfaces.EXPIRED:
+ return exceptions.ExpirationError()
+ elif abortion_outcome == base_interfaces.SERVICER_FAILURE:
+ return exceptions.ServicerError()
+ elif abortion_outcome == base_interfaces.SERVICED_FAILURE:
+ return exceptions.ServicedError()
+ else:
+ return exceptions.NetworkError()
+
+
+class UnaryConsumer(stream.Consumer):
+ """A stream.Consumer that should only ever be passed one value."""
+
+ def __init__(self, on_termination):
+ self._on_termination = on_termination
+ self._value = None
+
+ def consume(self, value):
+ self._value = value
+
+ def terminate(self):
+ self._on_termination(self._value)
+
+ def consume_and_terminate(self, value):
+ self._on_termination(value)
+
+
+class Rendezvous(stream.Consumer):
+ """A rendez-vous with stream.Consumer and iterator interfaces."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._values_completed = False
+ self._abortion = None
+
+ def consume(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
+
+ def terminate(self):
+ with self._condition:
+ self._values_completed = True
+ self._condition.notify()
+
+ def consume_and_terminate(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._values_completed = True
+ self._condition.notify()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while ((self._abortion is None) and
+ (not self._values) and
+ (not self._values_completed)):
+ self._condition.wait()
+ if self._abortion is not None:
+ raise _abortion_outcome_to_exception(self._abortion)
+ elif self._values:
+ return self._values.pop(0)
+ elif self._values_completed:
+ raise StopIteration()
+ else:
+ raise AssertionError('Unreachable code reached!')
+
+ def set_outcome(self, outcome):
+ with self._condition:
+ if outcome != base_interfaces.COMPLETED:
+ self._abortion = outcome
+ self._condition.notify()
+
+
+class RpcContext(interfaces.RpcContext):
+ """A wrapped base_interfaces.OperationContext."""
+
+ def __init__(self, operation_context):
+ self._operation_context = operation_context
+
+ def is_active(self):
+ return self._operation_context.is_active()
+
+ def time_remaining(self):
+ return self._operation_context.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ self._operation_context.add_termination_callback(
+ _as_operation_termination_callback(abortion_callback))
+
+
+def pipe_iterator_to_consumer(iterator, consumer, active, terminate):
+ """Pipes values emitted from an iterator to a stream.Consumer.
+
+ Args:
+ iterator: An iterator from which values will be emitted.
+ consumer: A stream.Consumer to which values will be passed.
+ active: A no-argument callable that returns True if the work being done by
+ this function is still valid and should not be abandoned and False if the
+ work being done by this function should be abandoned.
+ terminate: A boolean indicating whether or not this function should
+ terminate the given consumer after passing to it all values emitted by the
+ given iterator.
+
+ Raises:
+ abandonment.Abandoned: If this function quits early after seeing False
+ returned by the active function passed to it.
+ Exception: This function raises whatever exceptions are raised by iterating
+ over the given iterator.
+ """
+ for element in iterator:
+ if not active():
+ raise abandonment.Abandoned()
+
+ consumer.consume(element)
+
+ if not active():
+ raise abandonment.Abandoned()
+ if terminate:
+ consumer.terminate()
+
+
+def abortion_outcome_to_exception(abortion_outcome):
+ return _abortion_outcome_to_exception(abortion_outcome)
+
+
+def as_operation_termination_callback(rpc_abortion_callback):
+ return _as_operation_termination_callback(rpc_abortion_callback)
diff --git a/src/python/_framework/face/_service.py b/src/python/_framework/face/_service.py
new file mode 100644
index 0000000000..d758c2f148
--- /dev/null
+++ b/src/python/_framework/face/_service.py
@@ -0,0 +1,189 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Behaviors for servicing RPCs."""
+
+# base_interfaces and interfaces are referenced from specification in this
+# module.
+from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import
+from _framework.face import _control
+from _framework.face import exceptions
+from _framework.face import interfaces # pylint: disable=unused-import
+from _framework.foundation import abandonment
+from _framework.foundation import callable_util
+from _framework.foundation import stream
+from _framework.foundation import stream_util
+
+
+class _ValueInStreamOutConsumer(stream.Consumer):
+ """A stream.Consumer that maps inputs one-to-many onto outputs."""
+
+ def __init__(self, behavior, context, downstream):
+ """Constructor.
+
+ Args:
+ behavior: A callable that takes a single value and an
+ interfaces.RpcContext and returns a generator of arbitrarily many
+ values.
+ context: An interfaces.RpcContext.
+ downstream: A stream.Consumer to which to pass the values generated by the
+ given behavior.
+ """
+ self._behavior = behavior
+ self._context = context
+ self._downstream = downstream
+
+ def consume(self, value):
+ _control.pipe_iterator_to_consumer(
+ self._behavior(value, self._context), self._downstream,
+ self._context.is_active, False)
+
+ def terminate(self):
+ self._downstream.terminate()
+
+ def consume_and_terminate(self, value):
+ _control.pipe_iterator_to_consumer(
+ self._behavior(value, self._context), self._downstream,
+ self._context.is_active, True)
+
+
+def _pool_wrap(behavior, operation_context):
+ """Wraps an operation-related behavior so that it may be called in a pool.
+
+ Args:
+ behavior: A callable related to carrying out an operation.
+ operation_context: A base_interfaces.OperationContext for the operation.
+
+ Returns:
+ A callable that when called carries out the behavior of the given callable
+ and handles whatever exceptions it raises appropriately.
+ """
+ def translation(*args):
+ try:
+ behavior(*args)
+ except (
+ abandonment.Abandoned,
+ exceptions.ExpirationError,
+ exceptions.CancellationError,
+ exceptions.ServicedError,
+ exceptions.NetworkError) as e:
+ if operation_context.is_active():
+ operation_context.fail(e)
+ except Exception as e:
+ operation_context.fail(e)
+ return callable_util.with_exceptions_logged(
+ translation, _control.INTERNAL_ERROR_LOG_MESSAGE)
+
+
+def adapt_inline_value_in_value_out(method):
+ def adaptation(response_consumer, operation_context):
+ rpc_context = _control.RpcContext(operation_context)
+ return stream_util.TransformingConsumer(
+ lambda request: method.service(request, rpc_context), response_consumer)
+ return adaptation
+
+
+def adapt_inline_value_in_stream_out(method):
+ def adaptation(response_consumer, operation_context):
+ rpc_context = _control.RpcContext(operation_context)
+ return _ValueInStreamOutConsumer(
+ method.service, rpc_context, response_consumer)
+ return adaptation
+
+
+def adapt_inline_stream_in_value_out(method, pool):
+ def adaptation(response_consumer, operation_context):
+ rendezvous = _control.Rendezvous()
+ operation_context.add_termination_callback(rendezvous.set_outcome)
+ def in_pool_thread():
+ response_consumer.consume_and_terminate(
+ method.service(rendezvous, _control.RpcContext(operation_context)))
+ pool.submit(_pool_wrap(in_pool_thread, operation_context))
+ return rendezvous
+ return adaptation
+
+
+def adapt_inline_stream_in_stream_out(method, pool):
+ """Adapts an interfaces.InlineStreamInStreamOutMethod for use with Consumers.
+
+ RPCs may be serviced by calling the return value of this function, passing
+ request values to the stream.Consumer returned from that call, and receiving
+ response values from the stream.Consumer passed to that call.
+
+ Args:
+ method: An interfaces.InlineStreamInStreamOutMethod.
+ pool: A thread pool.
+
+ Returns:
+ A callable that takes a stream.Consumer and a
+ base_interfaces.OperationContext and returns a stream.Consumer.
+ """
+ def adaptation(response_consumer, operation_context):
+ rendezvous = _control.Rendezvous()
+ operation_context.add_termination_callback(rendezvous.set_outcome)
+ def in_pool_thread():
+ _control.pipe_iterator_to_consumer(
+ method.service(rendezvous, _control.RpcContext(operation_context)),
+ response_consumer, operation_context.is_active, True)
+ pool.submit(_pool_wrap(in_pool_thread, operation_context))
+ return rendezvous
+ return adaptation
+
+
+def adapt_event_value_in_value_out(method):
+ def adaptation(response_consumer, operation_context):
+ def on_payload(payload):
+ method.service(
+ payload, response_consumer.consume_and_terminate,
+ _control.RpcContext(operation_context))
+ return _control.UnaryConsumer(on_payload)
+ return adaptation
+
+
+def adapt_event_value_in_stream_out(method):
+ def adaptation(response_consumer, operation_context):
+ def on_payload(payload):
+ method.service(
+ payload, response_consumer, _control.RpcContext(operation_context))
+ return _control.UnaryConsumer(on_payload)
+ return adaptation
+
+
+def adapt_event_stream_in_value_out(method):
+ def adaptation(response_consumer, operation_context):
+ rpc_context = _control.RpcContext(operation_context)
+ return method.service(response_consumer.consume_and_terminate, rpc_context)
+ return adaptation
+
+
+def adapt_event_stream_in_stream_out(method):
+ def adaptation(response_consumer, operation_context):
+ return method.service(
+ response_consumer, _control.RpcContext(operation_context))
+ return adaptation
diff --git a/src/python/_framework/face/_test_case.py b/src/python/_framework/face/_test_case.py
new file mode 100644
index 0000000000..50b55c389f
--- /dev/null
+++ b/src/python/_framework/face/_test_case.py
@@ -0,0 +1,81 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Common lifecycle code for in-memory-ticket-exchange Face-layer tests."""
+
+from _framework.face import implementations
+from _framework.face.testing import base_util
+from _framework.face.testing import test_case
+from _framework.foundation import logging_pool
+
+_TIMEOUT = 3
+_MAXIMUM_POOL_SIZE = 100
+
+
+class FaceTestCase(test_case.FaceTestCase):
+ """Provides abstract Face-layer tests an in-memory implementation."""
+
+ def set_up_implementation(
+ self,
+ name,
+ methods,
+ inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods,
+ event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods,
+ multi_method):
+ servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
+ stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
+
+ servicer = implementations.servicer(
+ servicer_pool,
+ inline_value_in_value_out_methods=inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods=event_value_in_value_out_methods,
+ event_value_in_stream_out_methods=event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods=event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
+ multi_method=multi_method)
+
+ linked_pair = base_util.linked_pair(servicer, _TIMEOUT)
+ server = implementations.server()
+ stub = implementations.stub(linked_pair.front, stub_pool)
+ return server, stub, (servicer_pool, stub_pool, linked_pair)
+
+ def tear_down_implementation(self, memo):
+ servicer_pool, stub_pool, linked_pair = memo
+ linked_pair.shut_down()
+ stub_pool.shutdown(wait=True)
+ servicer_pool.shutdown(wait=True)
diff --git a/src/python/_framework/face/blocking_invocation_inline_service_test.py b/src/python/_framework/face/blocking_invocation_inline_service_test.py
new file mode 100644
index 0000000000..96563c94ee
--- /dev/null
+++ b/src/python/_framework/face/blocking_invocation_inline_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from _framework.face import _test_case
+from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case
+
+
+class BlockingInvocationInlineServiceTest(
+ _test_case.FaceTestCase,
+ test_case.BlockingInvocationInlineServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/_framework/face/demonstration.py b/src/python/_framework/face/demonstration.py
new file mode 100644
index 0000000000..501ec6b3f8
--- /dev/null
+++ b/src/python/_framework/face/demonstration.py
@@ -0,0 +1,118 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Demonstration-suitable implementation of the face layer of RPC Framework."""
+
+from _framework.base import util as _base_util
+from _framework.base.packets import implementations as _tickets_implementations
+from _framework.face import implementations
+from _framework.foundation import logging_pool
+
+_POOL_SIZE_LIMIT = 20
+
+_MAXIMUM_TIMEOUT = 90
+
+
+class LinkedPair(object):
+ """A Server and Stub that are linked to one another.
+
+ Attributes:
+ server: A Server.
+ stub: A Stub.
+ """
+
+ def shut_down(self):
+ """Shuts down this object and releases its resources."""
+ raise NotImplementedError()
+
+
+class _LinkedPair(LinkedPair):
+
+ def __init__(self, server, stub, front, back, pools):
+ self.server = server
+ self.stub = stub
+ self._front = front
+ self._back = back
+ self._pools = pools
+
+ def shut_down(self):
+ _base_util.wait_for_idle(self._front)
+ _base_util.wait_for_idle(self._back)
+
+ for pool in self._pools:
+ pool.shutdown(wait=True)
+
+
+def server_and_stub(
+ default_timeout,
+ inline_value_in_value_out_methods=None,
+ inline_value_in_stream_out_methods=None,
+ inline_stream_in_value_out_methods=None,
+ inline_stream_in_stream_out_methods=None,
+ event_value_in_value_out_methods=None,
+ event_value_in_stream_out_methods=None,
+ event_stream_in_value_out_methods=None,
+ event_stream_in_stream_out_methods=None,
+ multi_method=None):
+ """Creates a Server and Stub linked together for use."""
+ front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ stub_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ pools = (
+ front_work_pool, front_transmission_pool, front_utility_pool,
+ back_work_pool, back_transmission_pool, back_utility_pool,
+ stub_pool)
+
+ servicer = implementations.servicer(
+ back_work_pool,
+ inline_value_in_value_out_methods=inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods=inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods=inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods=event_value_in_value_out_methods,
+ event_value_in_stream_out_methods=event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods=event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
+ multi_method=multi_method)
+
+ front = _tickets_implementations.front(
+ front_work_pool, front_transmission_pool, front_utility_pool)
+ back = _tickets_implementations.back(
+ servicer, back_work_pool, back_transmission_pool, back_utility_pool,
+ default_timeout, _MAXIMUM_TIMEOUT)
+ front.join_rear_link(back)
+ back.join_fore_link(front)
+
+ stub = implementations.stub(front, stub_pool)
+
+ return _LinkedPair(implementations.server(), stub, front, back, pools)
diff --git a/src/python/_framework/face/event_invocation_synchronous_event_service_test.py b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py
new file mode 100644
index 0000000000..48e05b2478
--- /dev/null
+++ b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from _framework.face import _test_case
+from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case
+
+
+class EventInvocationSynchronousEventServiceTest(
+ _test_case.FaceTestCase,
+ test_case.EventInvocationSynchronousEventServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/_framework/face/exceptions.py b/src/python/_framework/face/exceptions.py
new file mode 100644
index 0000000000..f112df70bc
--- /dev/null
+++ b/src/python/_framework/face/exceptions.py
@@ -0,0 +1,77 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Exceptions used in the Face layer of RPC Framework."""
+
+import abc
+
+
+class NoSuchMethodError(Exception):
+ """Raised by customer code to indicate an unrecognized RPC method name.
+
+ Attributes:
+ name: The unrecognized name.
+ """
+
+ def __init__(self, name):
+ """Constructor.
+
+ Args:
+ name: The unrecognized RPC method name.
+ """
+ super(NoSuchMethodError, self).__init__()
+ self.name = name
+
+
+class RpcError(Exception):
+ """Common super type for all exceptions raised by the Face layer.
+
+ Only RPC Framework should instantiate and raise these exceptions.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class CancellationError(RpcError):
+ """Indicates that an RPC has been cancelled."""
+
+
+class ExpirationError(RpcError):
+ """Indicates that an RPC has expired ("timed out")."""
+
+
+class NetworkError(RpcError):
+ """Indicates that some error occurred on the network."""
+
+
+class ServicedError(RpcError):
+ """Indicates that the Serviced failed in the course of an RPC."""
+
+
+class ServicerError(RpcError):
+ """Indicates that the Servicer failed in the course of servicing an RPC."""
diff --git a/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py
new file mode 100644
index 0000000000..96f5fe85d3
--- /dev/null
+++ b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py
@@ -0,0 +1,46 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""One of the tests of the Face layer of RPC Framework."""
+
+import unittest
+
+from _framework.face import _test_case
+from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case
+
+
+class FutureInvocationAsynchronousEventServiceTest(
+ _test_case.FaceTestCase,
+ test_case.FutureInvocationAsynchronousEventServiceTestCase,
+ unittest.TestCase):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/_framework/face/implementations.py b/src/python/_framework/face/implementations.py
new file mode 100644
index 0000000000..94362e2007
--- /dev/null
+++ b/src/python/_framework/face/implementations.py
@@ -0,0 +1,246 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the Face layer of RPC Framework."""
+
+from _framework.base import exceptions as _base_exceptions
+from _framework.base import interfaces as base_interfaces
+from _framework.face import _calls
+from _framework.face import _service
+from _framework.face import exceptions
+from _framework.face import interfaces
+
+
+class _BaseServicer(base_interfaces.Servicer):
+
+ def __init__(self, methods, multi_method):
+ self._methods = methods
+ self._multi_method = multi_method
+
+ def service(self, name, context, output_consumer):
+ method = self._methods.get(name, None)
+ if method is not None:
+ return method(output_consumer, context)
+ elif self._multi_method is not None:
+ try:
+ return self._multi_method.service(name, output_consumer, context)
+ except exceptions.NoSuchMethodError:
+ raise _base_exceptions.NoSuchMethodError()
+ else:
+ raise _base_exceptions.NoSuchMethodError()
+
+
+class _Server(interfaces.Server):
+ """An interfaces.Server implementation."""
+
+
+class _Stub(interfaces.Stub):
+ """An interfaces.Stub implementation."""
+
+ def __init__(self, front, pool):
+ self._front = front
+ self._pool = pool
+
+ def blocking_value_in_value_out(self, name, request, timeout):
+ return _calls.blocking_value_in_value_out(
+ self._front, name, request, timeout, 'unused trace ID')
+
+ def future_value_in_value_out(self, name, request, timeout):
+ return _calls.future_value_in_value_out(
+ self._front, name, request, timeout, 'unused trace ID')
+
+ def inline_value_in_stream_out(self, name, request, timeout):
+ return _calls.inline_value_in_stream_out(
+ self._front, name, request, timeout, 'unused trace ID')
+
+ def blocking_stream_in_value_out(self, name, request_iterator, timeout):
+ return _calls.blocking_stream_in_value_out(
+ self._front, name, request_iterator, timeout, 'unused trace ID')
+
+ def future_stream_in_value_out(self, name, request_iterator, timeout):
+ return _calls.future_stream_in_value_out(
+ self._front, name, request_iterator, timeout, 'unused trace ID',
+ self._pool)
+
+ def inline_stream_in_stream_out(self, name, request_iterator, timeout):
+ return _calls.inline_stream_in_stream_out(
+ self._front, name, request_iterator, timeout, 'unused trace ID',
+ self._pool)
+
+ def event_value_in_value_out(
+ self, name, request, response_callback, abortion_callback, timeout):
+ return _calls.event_value_in_value_out(
+ self._front, name, request, response_callback, abortion_callback,
+ timeout, 'unused trace ID')
+
+ def event_value_in_stream_out(
+ self, name, request, response_consumer, abortion_callback, timeout):
+ return _calls.event_value_in_stream_out(
+ self._front, name, request, response_consumer, abortion_callback,
+ timeout, 'unused trace ID')
+
+ def event_stream_in_value_out(
+ self, name, response_callback, abortion_callback, timeout):
+ return _calls.event_stream_in_value_out(
+ self._front, name, response_callback, abortion_callback, timeout,
+ 'unused trace ID')
+
+ def event_stream_in_stream_out(
+ self, name, response_consumer, abortion_callback, timeout):
+ return _calls.event_stream_in_stream_out(
+ self._front, name, response_consumer, abortion_callback, timeout,
+ 'unused trace ID')
+
+
+def _aggregate_methods(
+ pool,
+ inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods,
+ event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods):
+ """Aggregates methods coded in according to different interfaces."""
+ methods = {}
+
+ def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation):
+ if unadapted_methods is not None:
+ for name, unadapted_method in unadapted_methods.iteritems():
+ adapted_methods[name] = adaptation(unadapted_method)
+
+ def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation):
+ if unadapted_methods is not None:
+ for name, unadapted_method in unadapted_methods.iteritems():
+ adapted_methods[name] = adaptation(unadapted_method, pool)
+
+ adapt_unpooled_methods(
+ methods, inline_value_in_value_out_methods,
+ _service.adapt_inline_value_in_value_out)
+ adapt_unpooled_methods(
+ methods, inline_value_in_stream_out_methods,
+ _service.adapt_inline_value_in_stream_out)
+ adapt_pooled_methods(
+ methods, inline_stream_in_value_out_methods,
+ _service.adapt_inline_stream_in_value_out)
+ adapt_pooled_methods(
+ methods, inline_stream_in_stream_out_methods,
+ _service.adapt_inline_stream_in_stream_out)
+ adapt_unpooled_methods(
+ methods, event_value_in_value_out_methods,
+ _service.adapt_event_value_in_value_out)
+ adapt_unpooled_methods(
+ methods, event_value_in_stream_out_methods,
+ _service.adapt_event_value_in_stream_out)
+ adapt_unpooled_methods(
+ methods, event_stream_in_value_out_methods,
+ _service.adapt_event_stream_in_value_out)
+ adapt_unpooled_methods(
+ methods, event_stream_in_stream_out_methods,
+ _service.adapt_event_stream_in_stream_out)
+
+ return methods
+
+
+def servicer(
+ pool,
+ inline_value_in_value_out_methods=None,
+ inline_value_in_stream_out_methods=None,
+ inline_stream_in_value_out_methods=None,
+ inline_stream_in_stream_out_methods=None,
+ event_value_in_value_out_methods=None,
+ event_value_in_stream_out_methods=None,
+ event_stream_in_value_out_methods=None,
+ event_stream_in_stream_out_methods=None,
+ multi_method=None):
+ """Creates a base_interfaces.Servicer.
+
+ The key sets of the passed dictionaries must be disjoint. It is guaranteed
+ that any passed MultiMethod implementation will only be called to service an
+ RPC if the RPC method name is not present in the key sets of the passed
+ dictionaries.
+
+ Args:
+ pool: A thread pool.
+ inline_value_in_value_out_methods: A dictionary mapping method names to
+ interfaces.InlineValueInValueOutMethod implementations.
+ inline_value_in_stream_out_methods: A dictionary mapping method names to
+ interfaces.InlineValueInStreamOutMethod implementations.
+ inline_stream_in_value_out_methods: A dictionary mapping method names to
+ interfaces.InlineStreamInValueOutMethod implementations.
+ inline_stream_in_stream_out_methods: A dictionary mapping method names to
+ interfaces.InlineStreamInStreamOutMethod implementations.
+ event_value_in_value_out_methods: A dictionary mapping method names to
+ interfaces.EventValueInValueOutMethod implementations.
+ event_value_in_stream_out_methods: A dictionary mapping method names to
+ interfaces.EventValueInStreamOutMethod implementations.
+ event_stream_in_value_out_methods: A dictionary mapping method names to
+ interfaces.EventStreamInValueOutMethod implementations.
+ event_stream_in_stream_out_methods: A dictionary mapping method names to
+ interfaces.EventStreamInStreamOutMethod implementations.
+ multi_method: An implementation of interfaces.MultiMethod.
+
+ Returns:
+ A base_interfaces.Servicer that services RPCs via the given implementations.
+ """
+ methods = _aggregate_methods(
+ pool,
+ inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods,
+ event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods)
+
+ return _BaseServicer(methods, multi_method)
+
+
+def server():
+ """Creates an interfaces.Server.
+
+ Returns:
+ An interfaces.Server.
+ """
+ return _Server()
+
+
+def stub(front, pool):
+ """Creates an interfaces.Stub.
+
+ Args:
+ front: A base_interfaces.Front.
+ pool: A futures.ThreadPoolExecutor.
+
+ Returns:
+ An interfaces.Stub that performs RPCs via the given base_interfaces.Front.
+ """
+ return _Stub(front, pool)
diff --git a/src/python/_framework/face/interfaces.py b/src/python/_framework/face/interfaces.py
new file mode 100644
index 0000000000..0cc7c70df3
--- /dev/null
+++ b/src/python/_framework/face/interfaces.py
@@ -0,0 +1,545 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces for the face layer of RPC Framework."""
+
+import abc
+
+# exceptions, abandonment, and future are referenced from specification in this
+# module.
+from _framework.face import exceptions # pylint: disable=unused-import
+from _framework.foundation import abandonment # pylint: disable=unused-import
+from _framework.foundation import future # pylint: disable=unused-import
+
+
+class CancellableIterator(object):
+ """Implements the Iterator protocol and affords a cancel method."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __iter__(self):
+ """Returns the self object in accordance with the Iterator protocol."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def next(self):
+ """Returns a value or raises StopIteration per the Iterator protocol."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Requests cancellation of whatever computation underlies this iterator."""
+ raise NotImplementedError()
+
+
+# Constants that categorize RPC abortion.
+# TODO(nathaniel): Learn and use Python's enum library for this de facto
+# enumerated type
+CANCELLED = 'abortion: cancelled'
+EXPIRED = 'abortion: expired'
+NETWORK_FAILURE = 'abortion: network failure'
+SERVICED_FAILURE = 'abortion: serviced failure'
+SERVICER_FAILURE = 'abortion: servicer failure'
+
+
+class RpcContext(object):
+ """Provides RPC-related information and action."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def is_active(self):
+ """Describes whether the RPC is active or has terminated."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def time_remaining(self):
+ """Describes the length of allowed time remaining for the RPC.
+
+ Returns:
+ A nonnegative float indicating the length of allowed time in seconds
+ remaining for the RPC to complete before it is considered to have timed
+ out.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_abortion_callback(self, abortion_callback):
+ """Registers a callback to be called if the RPC is aborted.
+
+ Args:
+ abortion_callback: A callable to be called and passed one of CANCELLED,
+ EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the
+ event of RPC abortion.
+ """
+ raise NotImplementedError()
+
+
+class InlineValueInValueOutMethod(object):
+ """A type for inline unary-request-unary-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, context):
+ """Services an RPC that accepts one value and produces one value.
+
+ Args:
+ request: The single request value for the RPC.
+ context: An RpcContext object.
+
+ Returns:
+ The single response value for the RPC.
+
+ Raises:
+ abandonment.Abandoned: If no response is necessary because the RPC has
+ been aborted.
+ """
+ raise NotImplementedError()
+
+
+class InlineValueInStreamOutMethod(object):
+ """A type for inline unary-request-stream-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, context):
+ """Services an RPC that accepts one value and produces a stream of values.
+
+ Args:
+ request: The single request value for the RPC.
+ context: An RpcContext object.
+
+ Yields:
+ The values that comprise the response stream of the RPC.
+
+ Raises:
+ abandonment.Abandoned: If completing the response stream is not necessary
+ because the RPC has been aborted.
+ """
+ raise NotImplementedError()
+
+
+class InlineStreamInValueOutMethod(object):
+ """A type for inline stream-request-unary-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request_iterator, context):
+ """Services an RPC that accepts a stream of values and produces one value.
+
+ Args:
+ request_iterator: An iterator that yields the request values of the RPC.
+ Drawing values from this iterator may also raise exceptions.RpcError to
+ indicate abortion of the RPC.
+ context: An RpcContext object.
+
+ Yields:
+ The values that comprise the response stream of the RPC.
+
+ Raises:
+ abandonment.Abandoned: If no response is necessary because the RPC has
+ been aborted.
+ exceptions.RpcError: Implementations of this method must not deliberately
+ raise exceptions.RpcError but may allow such errors raised by the
+ request_iterator passed to them to propagate through their bodies
+ uncaught.
+ """
+ raise NotImplementedError()
+
+
+class InlineStreamInStreamOutMethod(object):
+ """A type for inline stream-request-stream-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request_iterator, context):
+ """Services an RPC that accepts and produces streams of values.
+
+ Args:
+ request_iterator: An iterator that yields the request values of the RPC.
+ Drawing values from this iterator may also raise exceptions.RpcError to
+ indicate abortion of the RPC.
+ context: An RpcContext object.
+
+ Yields:
+ The values that comprise the response stream of the RPC.
+
+ Raises:
+ abandonment.Abandoned: If completing the response stream is not necessary
+ because the RPC has been aborted.
+ exceptions.RpcError: Implementations of this method must not deliberately
+ raise exceptions.RpcError but may allow such errors raised by the
+ request_iterator passed to them to propagate through their bodies
+ uncaught.
+ """
+ raise NotImplementedError()
+
+
+class EventValueInValueOutMethod(object):
+ """A type for event-driven unary-request-unary-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_callback, context):
+ """Services an RPC that accepts one value and produces one value.
+
+ Args:
+ request: The single request value for the RPC.
+ response_callback: A callback to be called to accept the response value of
+ the RPC.
+ context: An RpcContext object.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class EventValueInStreamOutMethod(object):
+ """A type for event-driven unary-request-stream-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context):
+ """Services an RPC that accepts one value and produces a stream of values.
+
+ Args:
+ request: The single request value for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ context: An RpcContext object.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class EventStreamInValueOutMethod(object):
+ """A type for event-driven stream-request-unary-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_callback, context):
+ """Services an RPC that accepts a stream of values and produces one value.
+
+ Args:
+ response_callback: A callback to be called to accept the response value of
+ the RPC.
+ context: An RpcContext object.
+
+ Returns:
+ A stream.Consumer with which to accept the request values of the RPC. The
+ consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing values to this object. Implementations must not assume that this
+ object will be called to completion of the request stream or even called
+ at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class EventStreamInStreamOutMethod(object):
+ """A type for event-driven stream-request-stream-response RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_consumer, context):
+ """Services an RPC that accepts and produces streams of values.
+
+ Args:
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ context: An RpcContext object.
+
+ Returns:
+ A stream.Consumer with which to accept the request values of the RPC. The
+ consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing values to this object. Implementations must not assume that this
+ object will be called to completion of the request stream or even called
+ at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class MultiMethod(object):
+ """A general type able to service many RPC methods."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, name, response_consumer, context):
+ """Services an RPC.
+
+ Args:
+ name: The RPC method name.
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ context: An RpcContext object.
+
+ Returns:
+ A stream.Consumer with which to accept the request values of the RPC. The
+ consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing values to this object. Implementations must not assume that this
+ object will be called to completion of the request stream or even called
+ at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ exceptions.NoSuchMethodError: If this MultiMethod does not recognize the
+ given RPC method name and is not able to service the RPC.
+ """
+ raise NotImplementedError()
+
+
+class Server(object):
+ """Specification of a running server that services RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+
+class Call(object):
+ """Invocation-side representation of an RPC.
+
+ Attributes:
+ context: An RpcContext affording information about the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Requests cancellation of the RPC."""
+ raise NotImplementedError()
+
+
+class Stub(object):
+ """Affords RPC methods to callers."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def blocking_value_in_value_out(self, name, request, timeout):
+ """Invokes a unary-request-unary-response RPC method.
+
+ This method blocks until either returning the response value of the RPC
+ (in the event of RPC completion) or raising an exception (in the event of
+ RPC abortion).
+
+ Args:
+ name: The RPC method name.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ The response value for the RPC.
+
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future_value_in_value_out(self, name, request, timeout):
+ """Invokes a unary-request-unary-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future will return an outcome indicating that the RPC returned
+ the response value of the RPC. In the event of RPC abortion, the
+ returned Future will return an outcome indicating that the RPC raised
+ an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def inline_value_in_stream_out(self, name, request, timeout):
+ """Invokes a unary-request-stream-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request: The request value for the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A CancellableIterator that yields the response values of the RPC and
+ affords RPC cancellation. Drawing response values from the returned
+ CancellableIterator may raise exceptions.RpcError indicating abortion of
+ the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def blocking_stream_in_value_out(self, name, request_iterator, timeout):
+ """Invokes a stream-request-unary-response RPC method.
+
+ This method blocks until either returning the response value of the RPC
+ (in the event of RPC completion) or raising an exception (in the event of
+ RPC abortion).
+
+ Args:
+ name: The RPC method name.
+ request_iterator: An iterator that yields the request values of the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ The response value for the RPC.
+
+ Raises:
+ exceptions.RpcError: Indicating that the RPC was aborted.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def future_stream_in_value_out(self, name, request_iterator, timeout):
+ """Invokes a stream-request-unary-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request_iterator: An iterator that yields the request values of the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A future.Future representing the RPC. In the event of RPC completion, the
+ returned Future will return an outcome indicating that the RPC returned
+ the response value of the RPC. In the event of RPC abortion, the
+ returned Future will return an outcome indicating that the RPC raised
+ an exceptions.RpcError.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def inline_stream_in_stream_out(self, name, request_iterator, timeout):
+ """Invokes a stream-request-stream-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request_iterator: An iterator that yields the request values of the RPC.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A CancellableIterator that yields the response values of the RPC and
+ affords RPC cancellation. Drawing response values from the returned
+ CancellableIterator may raise exceptions.RpcError indicating abortion of
+ the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_value_in_value_out(
+ self, name, request, response_callback, abortion_callback, timeout):
+ """Event-driven invocation of a unary-request-unary-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request: The request value for the RPC.
+ response_callback: A callback to be called to accept the response value
+ of the RPC.
+ abortion_callback: A callback to be called to accept one of CANCELLED,
+ EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
+ abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A Call object for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_value_in_stream_out(
+ self, name, request, response_consumer, abortion_callback, timeout):
+ """Event-driven invocation of a unary-request-stream-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ request: The request value for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ abortion_callback: A callback to be called to accept one of CANCELLED,
+ EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
+ abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A Call object for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_stream_in_value_out(
+ self, name, response_callback, abortion_callback, timeout):
+ """Event-driven invocation of a unary-request-unary-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ response_callback: A callback to be called to accept the response value
+ of the RPC.
+ abortion_callback: A callback to be called to accept one of CANCELLED,
+ EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
+ abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A pair of a Call object for the RPC and a stream.Consumer to which the
+ request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def event_stream_in_stream_out(
+ self, name, response_consumer, abortion_callback, timeout):
+ """Event-driven invocation of a unary-request-stream-response RPC method.
+
+ Args:
+ name: The RPC method name.
+ response_consumer: A stream.Consumer to be called to accept the response
+ values of the RPC.
+ abortion_callback: A callback to be called to accept one of CANCELLED,
+ EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
+ abortion.
+ timeout: A duration of time in seconds to allow for the RPC.
+
+ Returns:
+ A pair of a Call object for the RPC and a stream.Consumer to which the
+ request values of the RPC should be passed.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/__init__.py b/src/python/_framework/face/testing/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/src/python/_framework/face/testing/__init__.py
diff --git a/src/python/_framework/face/testing/base_util.py b/src/python/_framework/face/testing/base_util.py
new file mode 100644
index 0000000000..d9ccb3af8f
--- /dev/null
+++ b/src/python/_framework/face/testing/base_util.py
@@ -0,0 +1,102 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for creating Base-layer objects for use in Face-layer tests."""
+
+import abc
+
+# interfaces is referenced from specification in this module.
+from _framework.base import util as _base_util
+from _framework.base.packets import implementations
+from _framework.base.packets import in_memory
+from _framework.base.packets import interfaces # pylint: disable=unused-import
+from _framework.foundation import logging_pool
+
+_POOL_SIZE_LIMIT = 20
+
+_MAXIMUM_TIMEOUT = 90
+
+
+class LinkedPair(object):
+ """A Front and Back that are linked to one another.
+
+ Attributes:
+ front: An interfaces.Front.
+ back: An interfaces.Back.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def shut_down(self):
+ """Shuts down this object and releases its resources."""
+ raise NotImplementedError()
+
+
+class _LinkedPair(LinkedPair):
+
+ def __init__(self, front, back, pools):
+ self.front = front
+ self.back = back
+ self._pools = pools
+
+ def shut_down(self):
+ _base_util.wait_for_idle(self.front)
+ _base_util.wait_for_idle(self.back)
+
+ for pool in self._pools:
+ pool.shutdown(wait=True)
+
+
+def linked_pair(servicer, default_timeout):
+ """Creates a Server and Stub linked together for use."""
+ link_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
+ pools = (
+ link_pool,
+ front_work_pool, front_transmission_pool, front_utility_pool,
+ back_work_pool, back_transmission_pool, back_utility_pool)
+
+ link = in_memory.Link(link_pool)
+ front = implementations.front(
+ front_work_pool, front_transmission_pool, front_utility_pool)
+ back = implementations.back(
+ servicer, back_work_pool, back_transmission_pool, back_utility_pool,
+ default_timeout, _MAXIMUM_TIMEOUT)
+ front.join_rear_link(link)
+ link.join_fore_link(front)
+ back.join_fore_link(link)
+ link.join_rear_link(back)
+
+ return _LinkedPair(front, back, pools)
diff --git a/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py
new file mode 100644
index 0000000000..0b1a2f0bd2
--- /dev/null
+++ b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -0,0 +1,223 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+# unittest is referenced from specification in this module.
+import abc
+import unittest # pylint: disable=unused-import
+
+from _framework.face import exceptions
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+
+_TIMEOUT = 3
+
+
+class BlockingInvocationInlineServiceTestCase(
+ test_case.FaceTestCase, coverage.BlockingCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, None)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ self.digest.inline_unary_unary_methods,
+ self.digest.inline_unary_stream_methods,
+ self.digest.inline_stream_unary_methods,
+ self.digest.inline_stream_stream_methods,
+ {}, {}, {}, {}, None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response = self.stub.blocking_value_in_value_out(
+ name, request, _TIMEOUT)
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response = self.stub.blocking_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response = self.stub.blocking_value_in_value_out(
+ name, first_request, _TIMEOUT)
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response = self.stub.blocking_value_in_value_out(
+ name, second_request, _TIMEOUT)
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ self.stub.blocking_value_in_value_out(name, request, _TIMEOUT)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail(), self.assertRaises(exceptions.ServicerError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/_framework/face/testing/callback.py b/src/python/_framework/face/testing/callback.py
new file mode 100644
index 0000000000..7a20869abe
--- /dev/null
+++ b/src/python/_framework/face/testing/callback.py
@@ -0,0 +1,94 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A utility useful in tests of asynchronous, event-driven interfaces."""
+
+import threading
+
+from _framework.foundation import stream
+
+
+class Callback(stream.Consumer):
+ """A utility object useful in tests of asynchronous code."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._unary_response = None
+ self._streamed_responses = []
+ self._completed = False
+ self._abortion = None
+
+ def abort(self, abortion):
+ with self._condition:
+ self._abortion = abortion
+ self._condition.notify_all()
+
+ def complete(self, unary_response):
+ with self._condition:
+ self._unary_response = unary_response
+ self._completed = True
+ self._condition.notify_all()
+
+ def consume(self, streamed_response):
+ with self._condition:
+ self._streamed_responses.append(streamed_response)
+
+ def terminate(self):
+ with self._condition:
+ self._completed = True
+ self._condition.notify_all()
+
+ def consume_and_terminate(self, streamed_response):
+ with self._condition:
+ self._streamed_responses.append(streamed_response)
+ self._completed = True
+ self._condition.notify_all()
+
+ def block_until_terminated(self):
+ with self._condition:
+ while self._abortion is None and not self._completed:
+ self._condition.wait()
+
+ def response(self):
+ with self._condition:
+ if self._abortion is None:
+ return self._unary_response
+ else:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+ def responses(self):
+ with self._condition:
+ if self._abortion is None:
+ return list(self._streamed_responses)
+ else:
+ raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+
+ def abortion(self):
+ with self._condition:
+ return self._abortion
diff --git a/src/python/_framework/face/testing/control.py b/src/python/_framework/face/testing/control.py
new file mode 100644
index 0000000000..3960c4e649
--- /dev/null
+++ b/src/python/_framework/face/testing/control.py
@@ -0,0 +1,87 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for instructing systems under test to block or fail."""
+
+import abc
+import contextlib
+import threading
+
+
+class Control(object):
+ """An object that accepts program control from a system under test.
+
+ Systems under test passed a Control should call its control() method
+ frequently during execution. The control() method may block, raise an
+ exception, or do nothing, all according to the enclosing test's desire for
+ the system under test to simulate hanging, failing, or functioning.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def control(self):
+ """Potentially does anything."""
+ raise NotImplementedError()
+
+
+class PauseFailControl(Control):
+ """A Control that can be used to pause or fail code under control."""
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._paused = False
+ self._fail = False
+
+ def control(self):
+ with self._condition:
+ if self._fail:
+ raise ValueError()
+
+ while self._paused:
+ self._condition.wait()
+
+ @contextlib.contextmanager
+ def pause(self):
+ """Pauses code under control while controlling code is in context."""
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ @contextlib.contextmanager
+ def fail(self):
+ """Fails code under control while controlling code is in context."""
+ with self._condition:
+ self._fail = True
+ yield
+ with self._condition:
+ self._fail = False
diff --git a/src/python/_framework/face/testing/coverage.py b/src/python/_framework/face/testing/coverage.py
new file mode 100644
index 0000000000..f3aca113fe
--- /dev/null
+++ b/src/python/_framework/face/testing/coverage.py
@@ -0,0 +1,123 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Governs coverage for the tests of the Face layer of RPC Framework."""
+
+import abc
+
+# These classes are only valid when inherited by unittest.TestCases.
+# pylint: disable=invalid-name
+
+
+class BlockingCoverage(object):
+ """Specification of test coverage for blocking behaviors."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSuccessfulStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testSequentialInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testExpiredStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testFailedStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+
+class FullCoverage(BlockingCoverage):
+ """Specification of test coverage for non-blocking behaviors."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def testParallelInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def testCancelledStreamRequestStreamResponse(self):
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/digest.py b/src/python/_framework/face/testing/digest.py
new file mode 100644
index 0000000000..8d1291c975
--- /dev/null
+++ b/src/python/_framework/face/testing/digest.py
@@ -0,0 +1,446 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Code for making a service.TestService more amenable to use in tests."""
+
+import collections
+import threading
+
+# testing_control, interfaces, and testing_service are referenced from
+# specification in this module.
+from _framework.face import exceptions
+from _framework.face import interfaces as face_interfaces
+from _framework.face.testing import control as testing_control # pylint: disable=unused-import
+from _framework.face.testing import interfaces # pylint: disable=unused-import
+from _framework.face.testing import service as testing_service # pylint: disable=unused-import
+from _framework.foundation import stream
+from _framework.foundation import stream_util
+
+_IDENTITY = lambda x: x
+
+
+class TestServiceDigest(
+ collections.namedtuple(
+ 'TestServiceDigest',
+ ['name',
+ 'methods',
+ 'inline_unary_unary_methods',
+ 'inline_unary_stream_methods',
+ 'inline_stream_unary_methods',
+ 'inline_stream_stream_methods',
+ 'event_unary_unary_methods',
+ 'event_unary_stream_methods',
+ 'event_stream_unary_methods',
+ 'event_stream_stream_methods',
+ 'multi_method',
+ 'unary_unary_messages_sequences',
+ 'unary_stream_messages_sequences',
+ 'stream_unary_messages_sequences',
+ 'stream_stream_messages_sequences'])):
+ """A transformation of a service.TestService.
+
+ Attributes:
+ name: The RPC service name to be used in the test.
+ methods: A sequence of interfaces.Method objects describing the RPC
+ methods that will be called during the test.
+ inline_unary_unary_methods: A dict from method name to
+ face_interfaces.InlineValueInValueOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_unary_stream_methods: A dict from method name to
+ face_interfaces.InlineValueInStreamOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_stream_unary_methods: A dict from method name to
+ face_interfaces.InlineStreamInValueOutMethod object to be used in tests of
+ in-line calls to behaviors under test.
+ inline_stream_stream_methods: A dict from method name to
+ face_interfaces.InlineStreamInStreamOutMethod object to be used in tests
+ of in-line calls to behaviors under test.
+ event_unary_unary_methods: A dict from method name to
+ face_interfaces.EventValueInValueOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_unary_stream_methods: A dict from method name to
+ face_interfaces.EventValueInStreamOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_stream_unary_methods: A dict from method name to
+ face_interfaces.EventStreamInValueOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ event_stream_stream_methods: A dict from method name to
+ face_interfaces.EventStreamInStreamOutMethod object to be used in tests of
+ event-driven calls to behaviors under test.
+ multi_method: A face_interfaces.MultiMethod to be used in tests of generic
+ calls to behaviors under test.
+ unary_unary_messages_sequences: A dict from method name to sequence of
+ service.UnaryUnaryTestMessages objects to be used to test the method
+ with the given name.
+ unary_stream_messages_sequences: A dict from method name to sequence of
+ service.UnaryStreamTestMessages objects to be used to test the method
+ with the given name.
+ stream_unary_messages_sequences: A dict from method name to sequence of
+ service.StreamUnaryTestMessages objects to be used to test the method
+ with the given name.
+ stream_stream_messages_sequences: A dict from method name to sequence of
+ service.StreamStreamTestMessages objects to be used to test the
+ method with the given name.
+ serialization: A serial.Serialization object describing serialization
+ behaviors for all the RPC methods.
+ """
+
+
+class _BufferingConsumer(stream.Consumer):
+ """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+
+ def __init__(self):
+ self.consumed = []
+ self.terminated = False
+
+ def consume(self, value):
+ self.consumed.append(value)
+
+ def terminate(self):
+ self.terminated = True
+
+ def consume_and_terminate(self, value):
+ self.consumed.append(value)
+ self.terminated = True
+
+
+class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod):
+
+ def __init__(self, unary_unary_test_method, control):
+ self._test_method = unary_unary_test_method
+ self._control = control
+
+ def service(self, request, context):
+ response_list = []
+ self._test_method.service(
+ request, response_list.append, context, self._control)
+ return response_list.pop(0)
+
+
+class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod):
+
+ def __init__(self, unary_unary_test_method, control, pool):
+ self._test_method = unary_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, request, response_callback, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_callback, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_callback, context,
+ self._control)
+
+
+class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod):
+
+ def __init__(self, unary_stream_test_method, control):
+ self._test_method = unary_stream_test_method
+ self._control = control
+
+ def service(self, request, context):
+ response_consumer = _BufferingConsumer()
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ for response in response_consumer.consumed:
+ yield response
+
+
+class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod):
+
+ def __init__(self, unary_stream_test_method, control, pool):
+ self._test_method = unary_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, request, response_consumer, context):
+ if self._pool is None:
+ self._test_method.service(
+ request, response_consumer, context, self._control)
+ else:
+ self._pool.submit(
+ self._test_method.service, request, response_consumer, context,
+ self._control)
+
+
+class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod):
+
+ def __init__(self, stream_unary_test_method, control):
+ self._test_method = stream_unary_test_method
+ self._control = control
+
+ def service(self, request_iterator, context):
+ response_list = []
+ request_consumer = self._test_method.service(
+ response_list.append, context, self._control)
+ for request in request_iterator:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return response_list.pop(0)
+
+
+class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod):
+
+ def __init__(self, stream_unary_test_method, control, pool):
+ self._test_method = stream_unary_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, response_callback, context):
+ request_consumer = self._test_method.service(
+ response_callback, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod):
+
+ def __init__(self, stream_stream_test_method, control):
+ self._test_method = stream_stream_test_method
+ self._control = control
+
+ def service(self, request_iterator, context):
+ response_consumer = _BufferingConsumer()
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+
+ for request in request_iterator:
+ request_consumer.consume(request)
+ while response_consumer.consumed:
+ yield response_consumer.consumed.pop(0)
+ response_consumer.terminate()
+
+
+class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod):
+
+ def __init__(self, stream_stream_test_method, control, pool):
+ self._test_method = stream_stream_test_method
+ self._control = control
+ self._pool = pool
+
+ def service(self, response_consumer, context):
+ request_consumer = self._test_method.service(
+ response_consumer, context, self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _UnaryConsumer(stream.Consumer):
+ """A Consumer that only allows consumption of exactly one value."""
+
+ def __init__(self, action):
+ self._lock = threading.Lock()
+ self._action = action
+ self._consumed = False
+ self._terminated = False
+
+ def consume(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+
+ self._action(value)
+
+ def terminate(self):
+ with self._lock:
+ if not self._consumed:
+ raise ValueError('Unary consumer hasn\'t yet consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._terminated = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+ self._terminated = True
+
+ self._action(value)
+
+
+class _UnaryUnaryAdaptation(object):
+
+ def __init__(self, unary_unary_test_method):
+ self._method = unary_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(
+ request, response_consumer.consume_and_terminate, context, control)
+ return _UnaryConsumer(action)
+
+
+class _UnaryStreamAdaptation(object):
+
+ def __init__(self, unary_stream_test_method):
+ self._method = unary_stream_test_method
+
+ def service(self, response_consumer, context, control):
+ def action(request):
+ self._method.service(request, response_consumer, context, control)
+ return _UnaryConsumer(action)
+
+
+class _StreamUnaryAdaptation(object):
+
+ def __init__(self, stream_unary_test_method):
+ self._method = stream_unary_test_method
+
+ def service(self, response_consumer, context, control):
+ return self._method.service(
+ response_consumer.consume_and_terminate, context, control)
+
+
+class _MultiMethod(face_interfaces.MultiMethod):
+
+ def __init__(self, methods, control, pool):
+ self._methods = methods
+ self._control = control
+ self._pool = pool
+
+ def service(self, name, response_consumer, context):
+ method = self._methods.get(name, None)
+ if method is None:
+ raise exceptions.NoSuchMethodError(name)
+ elif self._pool is None:
+ return method(response_consumer, context, self._control)
+ else:
+ request_consumer = method(response_consumer, context, self._control)
+ return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+
+
+class _Assembly(
+ collections.namedtuple(
+ '_Assembly',
+ ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+ """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(
+ scenarios, names, inline_method_constructor, event_method_constructor,
+ adapter, control, pool):
+ """Creates an _Assembly from the given scenarios."""
+ methods = []
+ inlines = {}
+ events = {}
+ adaptations = {}
+ messages = {}
+ for name, scenario in scenarios.iteritems():
+ if name in names:
+ raise ValueError('Repeated name "%s"!' % name)
+
+ test_method = scenario[0]
+ inline_method = inline_method_constructor(test_method, control)
+ event_method = event_method_constructor(test_method, control, pool)
+ adaptation = adapter(test_method)
+
+ methods.append(test_method)
+ inlines[name] = inline_method
+ events[name] = event_method
+ adaptations[name] = adaptation
+ messages[name] = scenario[1]
+
+ return _Assembly(methods, inlines, events, adaptations, messages)
+
+
+def digest(service, control, pool):
+ """Creates a TestServiceDigest from a TestService.
+
+ Args:
+ service: A testing_service.TestService.
+ control: A testing_control.Control.
+ pool: If RPC methods should be serviced in a separate thread, a thread pool.
+ None if RPC methods should be serviced in the thread belonging to the
+ run-time that calls for their service.
+
+ Returns:
+ A TestServiceDigest synthesized from the given service.TestService.
+ """
+ names = set()
+
+ unary_unary = _assemble(
+ service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod,
+ _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
+ names.update(set(unary_unary.inlines))
+
+ unary_stream = _assemble(
+ service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod,
+ _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
+ names.update(set(unary_stream.inlines))
+
+ stream_unary = _assemble(
+ service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod,
+ _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
+ names.update(set(stream_unary.inlines))
+
+ stream_stream = _assemble(
+ service.stream_stream_scenarios(), names, _InlineStreamStreamMethod,
+ _EventStreamStreamMethod, _IDENTITY, control, pool)
+ names.update(set(stream_stream.inlines))
+
+ methods = list(unary_unary.methods)
+ methods.extend(unary_stream.methods)
+ methods.extend(stream_unary.methods)
+ methods.extend(stream_stream.methods)
+ adaptations = dict(unary_unary.adaptations)
+ adaptations.update(unary_stream.adaptations)
+ adaptations.update(stream_unary.adaptations)
+ adaptations.update(stream_stream.adaptations)
+
+ return TestServiceDigest(
+ service.name(),
+ methods,
+ unary_unary.inlines,
+ unary_stream.inlines,
+ stream_unary.inlines,
+ stream_stream.inlines,
+ unary_unary.events,
+ unary_stream.events,
+ stream_unary.events,
+ stream_stream.events,
+ _MultiMethod(adaptations, control, pool),
+ unary_unary.messages,
+ unary_stream.messages,
+ stream_unary.messages,
+ stream_stream.messages)
diff --git a/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
new file mode 100644
index 0000000000..dba73a9368
--- /dev/null
+++ b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
@@ -0,0 +1,367 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+import abc
+import unittest
+
+from _framework.face import interfaces
+from _framework.face.testing import callback as testing_callback
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+
+_TIMEOUT = 3
+
+
+class EventInvocationSynchronousEventServiceTestCase(
+ test_case.FaceTestCase, coverage.FullCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, None)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ {}, {}, {}, {},
+ self.digest.event_unary_unary_methods,
+ self.digest.event_unary_stream_methods,
+ self.digest.event_stream_unary_methods,
+ self.digest.event_stream_stream_methods,
+ None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+ response = callback.response()
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+ responses = callback.responses()
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+ response = callback.response()
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+ responses = callback.responses()
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ # pylint: disable=cell-var-from-loop
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+ first_callback = testing_callback.Callback()
+ second_callback = testing_callback.Callback()
+
+ def make_second_invocation(first_response):
+ first_callback.complete(first_response)
+ self.stub.event_value_in_value_out(
+ name, second_request, second_callback.complete,
+ second_callback.abort, _TIMEOUT)
+
+ self.stub.event_value_in_value_out(
+ name, first_request, make_second_invocation, first_callback.abort,
+ _TIMEOUT)
+ second_callback.block_until_terminated()
+
+ first_response = first_callback.response()
+ second_response = second_callback.response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ callback = testing_callback.Callback()
+
+ self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.EXPIRED, callback.abortion())
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ unused_call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ with self.control.fail():
+ unused_call, request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+
+ def testParallelInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ first_callback = testing_callback.Callback()
+ second_request = test_messages.request()
+ second_callback = testing_callback.Callback()
+
+ self.stub.event_value_in_value_out(
+ name, first_request, first_callback.complete, first_callback.abort,
+ _TIMEOUT)
+ self.stub.event_value_in_value_out(
+ name, second_request, second_callback.complete,
+ second_callback.abort, _TIMEOUT)
+ first_callback.block_until_terminated()
+ second_callback.block_until_terminated()
+
+ first_response = first_callback.response()
+ second_response = second_callback.response()
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ with self.control.pause():
+ call = self.stub.event_value_in_value_out(
+ name, request, callback.complete, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = testing_callback.Callback()
+
+ call = self.stub.event_value_in_stream_out(
+ name, request, callback, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = testing_callback.Callback()
+
+ call, request_consumer = self.stub.event_stream_in_value_out(
+ name, callback.complete, callback.abort, _TIMEOUT)
+ for request in requests:
+ request_consumer.consume(request)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for unused_test_messages in test_messages_sequence:
+ callback = testing_callback.Callback()
+
+ call, unused_request_consumer = self.stub.event_stream_in_stream_out(
+ name, callback, callback.abort, _TIMEOUT)
+ call.cancel()
+ callback.block_until_terminated()
+
+ self.assertEqual(interfaces.CANCELLED, callback.abortion())
diff --git a/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
new file mode 100644
index 0000000000..cf8b2eeb95
--- /dev/null
+++ b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -0,0 +1,377 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""A test to verify an implementation of the Face layer of RPC Framework."""
+
+import abc
+import contextlib
+import threading
+import unittest
+
+from _framework.face import exceptions
+from _framework.face.testing import control
+from _framework.face.testing import coverage
+from _framework.face.testing import digest
+from _framework.face.testing import stock_service
+from _framework.face.testing import test_case
+from _framework.foundation import future
+from _framework.foundation import logging_pool
+
+_TIMEOUT = 3
+_MAXIMUM_POOL_SIZE = 100
+
+
+class _PauseableIterator(object):
+
+ def __init__(self, upstream):
+ self._upstream = upstream
+ self._condition = threading.Condition()
+ self._paused = False
+
+ @contextlib.contextmanager
+ def pause(self):
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ with self._condition:
+ while self._paused:
+ self._condition.wait()
+ return next(self._upstream)
+
+
+class FutureInvocationAsynchronousEventServiceTestCase(
+ test_case.FaceTestCase, coverage.FullCoverage):
+ """A test of the Face layer of RPC Framework.
+
+ Concrete subclasses must also extend unittest.TestCase.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.control = control.PauseFailControl()
+ self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
+ self.digest = digest.digest(
+ stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool)
+
+ self.server, self.stub, self.memo = self.set_up_implementation(
+ self.digest.name, self.digest.methods,
+ {}, {}, {}, {},
+ self.digest.event_unary_unary_methods,
+ self.digest.event_unary_stream_methods,
+ self.digest.event_stream_unary_methods,
+ self.digest.event_stream_stream_methods,
+ None)
+
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
+
+ Overriding implementations must call this implementation.
+ """
+ self.tear_down_implementation(self.memo)
+ self.digest_pool.shutdown(wait=True)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ response = response_future.outcome().return_value
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, request_iterator, _TIMEOUT)
+ response = response_future.outcome().return_value
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, request_iterator, _TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self.stub.future_value_in_value_out(
+ name, first_request, _TIMEOUT)
+ first_response = first_response_future.outcome().return_value
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response_future = self.stub.future_value_in_value_out(
+ name, second_request, _TIMEOUT)
+ second_response = second_response_future.outcome().return_value
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ outcome = response_future.outcome()
+
+ self.assertIsInstance(
+ outcome.exception, exceptions.ExpirationError)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ outcome = response_future.outcome()
+
+ self.assertIsInstance(
+ outcome.exception, exceptions.ExpirationError)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause(), self.assertRaises(
+ exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.fail():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ outcome = response_future.outcome()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_callback before the
+ # expiration of the RPC.
+ self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.fail():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ outcome = response_future.outcome()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_callback before the
+ # expiration of the RPC.
+ self.assertIsInstance(outcome.exception, exceptions.ExpirationError)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ list(response_iterator)
+
+ def testParallelInvocations(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self.stub.future_value_in_value_out(
+ name, first_request, _TIMEOUT)
+ second_response_future = self.stub.future_value_in_value_out(
+ name, second_request, _TIMEOUT)
+ first_response = first_response_future.outcome().return_value
+ second_response = second_response_future.outcome().return_value
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ @unittest.skip('TODO(nathaniel): implement.')
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ raise NotImplementedError()
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_future = self.stub.future_value_in_value_out(
+ name, request, _TIMEOUT)
+ cancelled = response_future.cancel()
+
+ self.assertFalse(cancelled)
+ self.assertEqual(future.ABORTED, response_future.outcome().category)
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.unary_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self.control.pause():
+ response_iterator = self.stub.inline_value_in_stream_out(
+ name, request, _TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(exceptions.CancellationError):
+ next(response_iterator)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_future = self.stub.future_stream_in_value_out(
+ name, iter(requests), _TIMEOUT)
+ cancelled = response_future.cancel()
+
+ self.assertFalse(cancelled)
+ self.assertEqual(future.ABORTED, response_future.outcome().category)
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for name, test_messages_sequence in (
+ self.digest.stream_stream_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self.control.pause():
+ response_iterator = self.stub.inline_stream_in_stream_out(
+ name, iter(requests), _TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(exceptions.CancellationError):
+ next(response_iterator)
diff --git a/src/python/_framework/face/testing/interfaces.py b/src/python/_framework/face/testing/interfaces.py
new file mode 100644
index 0000000000..253f6f118d
--- /dev/null
+++ b/src/python/_framework/face/testing/interfaces.py
@@ -0,0 +1,117 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# cardinality is referenced from specification in this module.
+from _framework.common import cardinality # pylint: disable=unused-import
+
+
+class Method(object):
+ """An RPC method to be used in tests of RPC implementations."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """Identify the name of the method.
+
+ Returns:
+ The name of the method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cardinality(self):
+ """Identify the cardinality of the method.
+
+ Returns:
+ A cardinality.Cardinality value describing the streaming semantics of the
+ method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def request_class(self):
+ """Identify the class used for the method's request objects.
+
+ Returns:
+ The class object of the class to which the method's request objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def response_class(self):
+ """Identify the class used for the method's response objects.
+
+ Returns:
+ The class object of the class to which the method's response objects
+ belong.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize the given request object.
+
+ Args:
+ request: A request object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """Synthesize a request object from a given bytestring.
+
+ Args:
+ serialized_request: A bytestring deserializable into a request object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize the given response object.
+
+ Args:
+ response: A response object appropriate for this method.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """Synthesize a response object from a given bytestring.
+
+ Args:
+ serialized_response: A bytestring deserializable into a response object
+ appropriate for this method.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/serial.py b/src/python/_framework/face/testing/serial.py
new file mode 100644
index 0000000000..47fc5822de
--- /dev/null
+++ b/src/python/_framework/face/testing/serial.py
@@ -0,0 +1,70 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility for serialization in the context of test RPC services."""
+
+import collections
+
+
+class Serialization(
+ collections.namedtuple(
+ '_Serialization',
+ ['request_serializers',
+ 'request_deserializers',
+ 'response_serializers',
+ 'response_deserializers'])):
+ """An aggregation of serialization behaviors for an RPC service.
+
+ Attributes:
+ request_serializers: A dict from method name to request object serializer
+ behavior.
+ request_deserializers: A dict from method name to request object
+ deserializer behavior.
+ response_serializers: A dict from method name to response object serializer
+ behavior.
+ response_deserializers: A dict from method name to response object
+ deserializer behavior.
+ """
+
+
+def serialization(methods):
+ """Creates a Serialization from a sequences of interfaces.Method objects."""
+ request_serializers = {}
+ request_deserializers = {}
+ response_serializers = {}
+ response_deserializers = {}
+ for method in methods:
+ name = method.name()
+ request_serializers[name] = method.serialize_request
+ request_deserializers[name] = method.deserialize_request
+ response_serializers[name] = method.serialize_response
+ response_deserializers[name] = method.deserialize_response
+ return Serialization(
+ request_serializers, request_deserializers, response_serializers,
+ response_deserializers)
diff --git a/src/python/_framework/face/testing/service.py b/src/python/_framework/face/testing/service.py
new file mode 100644
index 0000000000..771346ec2e
--- /dev/null
+++ b/src/python/_framework/face/testing/service.py
@@ -0,0 +1,337 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private interfaces implemented by data sets used in Face-layer tests."""
+
+import abc
+
+# interfaces is referenced from specification in this module.
+from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
+from _framework.face.testing import interfaces
+
+
+class UnaryUnaryTestMethod(interfaces.Method):
+ """Like face_interfaces.EventValueInValueOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_callback, context, control):
+ """Services an RPC that accepts one message and produces one message.
+
+ Args:
+ request: The single request message for the RPC.
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An face_interfaces.RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryUnaryTestMessages(object):
+ """A type for unary-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, response, test_case):
+ """Verifies that the computed response matches the given request.
+
+ Args:
+ request: A request message.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMethod(interfaces.Method):
+ """Like face_interfaces.EventValueInStreamOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context, control):
+ """Services an RPC that takes one message and produces a stream of messages.
+
+ Args:
+ request: The single request message for the RPC.
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamTestMessages(object):
+ """A type for unary-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
+
+ Implementations of this method should return a different message with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A request message.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, request, responses, test_case):
+ """Verifies that the computed responses match the given request.
+
+ Args:
+ request: A request message.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the request and responses do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMethod(interfaces.Method):
+ """Like face_interfaces.EventStreamInValueOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_callback, context, control):
+ """Services an RPC that takes a stream of messages and produces one message.
+
+ Args:
+ response_callback: A callback to be called to accept the response message
+ of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryTestMessages(object):
+ """A type for stream-request-unary-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, response, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ response: A response message.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and response do not match, indicating that
+ there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMethod(interfaces.Method):
+ """Like face_interfaces.EventStreamInStreamOutMethod but with a control."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def service(self, response_consumer, context, control):
+ """Services an RPC that accepts and produces streams of messages.
+
+ Args:
+ response_consumer: A stream.Consumer to be called to accept the response
+ messages of the RPC.
+ context: An RpcContext object.
+ control: A test_control.Control to control execution of this method.
+
+ Returns:
+ A stream.Consumer with which to accept the request messages of the RPC.
+ The consumer returned from this method may or may not be invoked to
+ completion: in the case of RPC abortion, RPC Framework will simply stop
+ passing messages to this object. Implementations must not assume that
+ this object will be called to completion of the request stream or even
+ called at all.
+
+ Raises:
+ abandonment.Abandoned: May or may not be raised when the RPC has been
+ aborted.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamTestMessages(object):
+ """A type for stream-request-stream-response message pairings."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
+
+ Implementations of this method should return a different sequences with each
+ call so that multiple test executions of the test method may be made with
+ different inputs.
+
+ Returns:
+ A sequence of request messages.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def verify(self, requests, responses, test_case):
+ """Verifies that the computed response matches the given requests.
+
+ Args:
+ requests: A sequence of request messages.
+ responses: A sequence of response messages.
+ test_case: A unittest.TestCase object affording useful assertion methods.
+
+ Raises:
+ AssertionError: If the requests and responses do not match, indicating
+ that there was some problem executing the RPC under test.
+ """
+ raise NotImplementedError()
+
+
+class TestService(object):
+ """A specification of implemented RPC methods to use in tests."""
+
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def name(self):
+ """Identifies the RPC service name used during the test.
+
+ Returns:
+ The RPC service name to be used for the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_unary_scenarios(self):
+ """Affords unary-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair
+ is a UnaryUnaryTestMethod object and the second element is a sequence
+ of UnaryUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_stream_scenarios(self):
+ """Affords unary-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ UnaryStreamTestMethod object and the second element is a sequence of
+ UnaryStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_unary_scenarios(self):
+ """Affords stream-request-unary-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ StreamUnaryTestMethod object and the second element is a sequence of
+ StreamUnaryTestMethodMessages objects.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_stream_scenarios(self):
+ """Affords stream-request-stream-response test methods and their messages.
+
+ Returns:
+ A dict from method name to pair. The first element of the pair is a
+ StreamStreamTestMethod object and the second element is a sequence of
+ StreamStreamTestMethodMessages objects.
+ """
+ raise NotImplementedError()
diff --git a/src/python/_framework/face/testing/stock_service.py b/src/python/_framework/face/testing/stock_service.py
new file mode 100644
index 0000000000..bd82877e83
--- /dev/null
+++ b/src/python/_framework/face/testing/stock_service.py
@@ -0,0 +1,374 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Examples of Python implementations of the stock.proto Stock service."""
+
+from _framework.common import cardinality
+from _framework.face.testing import service
+from _framework.foundation import abandonment
+from _framework.foundation import stream
+from _framework.foundation import stream_util
+from _junkdrawer import stock_pb2
+
+SYMBOL_FORMAT = 'test symbol:%03d'
+STREAM_LENGTH = 400
+
+# A test-appropriate security-pricing function. :-P
+_price = lambda symbol_name: float(hash(symbol_name) % 4096)
+
+
+def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
+ """A unary-request, unary-response test method."""
+ control.control()
+ if active():
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol)))
+ else:
+ raise abandonment.Abandoned()
+
+
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+ """A stream-request, stream-response test method."""
+ def stock_reply_for_stock_request(stock_request):
+ control.control()
+ if active():
+ return stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ else:
+ raise abandonment.Abandoned()
+ return stream_util.TransformingConsumer(
+ stock_reply_for_stock_request, stock_reply_consumer)
+
+
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+ """A unary-request, stream-response test method."""
+ base_price = _price(stock_request.symbol)
+ for index in range(stock_request.num_trades_to_watch):
+ control.control()
+ if active():
+ stock_reply_consumer.consume(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=base_price + index))
+ else:
+ raise abandonment.Abandoned()
+ stock_reply_consumer.terminate()
+
+
+def _get_highest_trade_price(stock_reply_callback, control, active):
+ """A stream-request, unary-response test method."""
+
+ class StockRequestConsumer(stream.Consumer):
+ """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+ def __init__(self):
+ self._symbol = None
+ self._price = None
+
+ def consume(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ self._symbol = stock_request.symbol
+ self._price = _price(stock_request.symbol)
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ self._symbol = stock_request.symbol
+ self._price = candidate_price
+
+ def terminate(self):
+ control.control()
+ if active():
+ if self._symbol is None:
+ raise ValueError()
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(symbol=self._symbol, price=self._price))
+ self._symbol = None
+ self._price = None
+
+ def consume_and_terminate(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=_price(stock_request.symbol)))
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=candidate_price))
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+
+ self._symbol = None
+ self._price = None
+
+ return StockRequestConsumer()
+
+
+class GetLastTradePrice(service.UnaryUnaryTestMethod):
+ """GetLastTradePrice for use in tests."""
+
+ def name(self):
+ return 'GetLastTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_callback, context, control):
+ _get_last_trade_price(
+ request, response_callback, control, context.is_active)
+
+
+class GetLastTradePriceMessages(service.UnaryUnaryTestMessages):
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(symbol=symbol)
+
+ def verify(self, request, response, test_case):
+ test_case.assertEqual(request.symbol, response.symbol)
+ test_case.assertEqual(_price(request.symbol), response.price)
+
+
+class GetLastTradePriceMultiple(service.StreamStreamTestMethod):
+ """GetLastTradePriceMultiple for use in tests."""
+
+ def name(self):
+ return 'GetLastTradePriceMultiple'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_consumer, context, control):
+ return _get_last_trade_price_multiple(
+ response_consumer, control, context.is_active)
+
+
+class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages):
+ """Pairs of message streams for use with GetLastTradePriceMultiple."""
+
+ def __init__(self):
+ self._index = 0
+
+ def requests(self):
+ base_index = self._index
+ self._index += 1
+ return [
+ stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index))
+ for index in range(STREAM_LENGTH)]
+
+ def verify(self, requests, responses, test_case):
+ test_case.assertEqual(len(requests), len(responses))
+ for stock_request, stock_reply in zip(requests, responses):
+ test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+ test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+
+
+class WatchFutureTrades(service.UnaryStreamTestMethod):
+ """WatchFutureTrades for use in tests."""
+
+ def name(self):
+ return 'WatchFutureTrades'
+
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_STREAM
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, request, response_consumer, context, control):
+ _watch_future_trades(request, response_consumer, control, context.is_active)
+
+
+class WatchFutureTradesMessages(service.UnaryStreamTestMessages):
+ """Pairs of a single request message and a sequence of response messages."""
+
+ def __init__(self):
+ self._index = 0
+
+ def request(self):
+ symbol = SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(
+ symbol=symbol, num_trades_to_watch=STREAM_LENGTH)
+
+ def verify(self, request, responses, test_case):
+ test_case.assertEqual(STREAM_LENGTH, len(responses))
+ base_price = _price(request.symbol)
+ for index, response in enumerate(responses):
+ test_case.assertEqual(base_price + index, response.price)
+
+
+class GetHighestTradePrice(service.StreamUnaryTestMethod):
+ """GetHighestTradePrice for use in tests."""
+
+ def name(self):
+ return 'GetHighestTradePrice'
+
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_UNARY
+
+ def request_class(self):
+ return stock_pb2.StockRequest
+
+ def response_class(self):
+ return stock_pb2.StockReply
+
+ def serialize_request(self, request):
+ return request.SerializeToString()
+
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
+
+ def serialize_response(self, response):
+ return response.SerializeToString()
+
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
+
+ def service(self, response_callback, context, control):
+ return _get_highest_trade_price(
+ response_callback, control, context.is_active)
+
+
+class GetHighestTradePriceMessages(service.StreamUnaryTestMessages):
+
+ def requests(self):
+ return [
+ stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index)
+ for index in range(STREAM_LENGTH)]
+
+ def verify(self, requests, response, test_case):
+ price = None
+ symbol = None
+ for stock_request in requests:
+ current_symbol = stock_request.symbol
+ current_price = _price(current_symbol)
+ if price is None or price < current_price:
+ price = current_price
+ symbol = current_symbol
+ test_case.assertEqual(price, response.price)
+ test_case.assertEqual(symbol, response.symbol)
+
+
+class StockTestService(service.TestService):
+ """A corpus of test data with one method of each RPC cardinality."""
+
+ def name(self):
+ return 'Stock'
+
+ def unary_unary_scenarios(self):
+ return {
+ 'GetLastTradePrice': (
+ GetLastTradePrice(), [GetLastTradePriceMessages()]),
+ }
+
+ def unary_stream_scenarios(self):
+ return {
+ 'WatchFutureTrades': (
+ WatchFutureTrades(), [WatchFutureTradesMessages()]),
+ }
+
+ def stream_unary_scenarios(self):
+ return {
+ 'GetHighestTradePrice': (
+ GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+ }
+
+ def stream_stream_scenarios(self):
+ return {
+ 'GetLastTradePriceMultiple': (
+ GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
+ }
+
+
+STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/_framework/face/testing/test_case.py b/src/python/_framework/face/testing/test_case.py
new file mode 100644
index 0000000000..09b5a67f5a
--- /dev/null
+++ b/src/python/_framework/face/testing/test_case.py
@@ -0,0 +1,111 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tools for creating tests of implementations of the Face layer."""
+
+import abc
+
+# face_interfaces and interfaces are referenced in specification in this module.
+from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import
+from _framework.face.testing import interfaces # pylint: disable=unused-import
+
+
+class FaceTestCase(object):
+ """Describes a test of the Face Layer of RPC Framework.
+
+ Concrete subclasses must also inherit from unittest.TestCase and from at least
+ one class that defines test methods.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_up_implementation(
+ self,
+ name,
+ methods,
+ inline_value_in_value_out_methods,
+ inline_value_in_stream_out_methods,
+ inline_stream_in_value_out_methods,
+ inline_stream_in_stream_out_methods,
+ event_value_in_value_out_methods,
+ event_value_in_stream_out_methods,
+ event_stream_in_value_out_methods,
+ event_stream_in_stream_out_methods,
+ multi_method):
+ """Instantiates the Face Layer implementation under test.
+
+ Args:
+ name: The service name to be used in the test.
+ methods: A sequence of interfaces.Method objects describing the RPC
+ methods that will be called during the test.
+ inline_value_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.InlineValueInValueOutMethod implementations of those
+ methods.
+ inline_value_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.InlineValueInStreamOutMethod implementations of those
+ methods.
+ inline_stream_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.InlineStreamInValueOutMethod implementations of those
+ methods.
+ inline_stream_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.InlineStreamInStreamOutMethod implementations of
+ those methods.
+ event_value_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.EventValueInValueOutMethod implementations of those
+ methods.
+ event_value_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.EventValueInStreamOutMethod implementations of those
+ methods.
+ event_stream_in_value_out_methods: A dictionary from string method names
+ to face_interfaces.EventStreamInValueOutMethod implementations of those
+ methods.
+ event_stream_in_stream_out_methods: A dictionary from string method names
+ to face_interfaces.EventStreamInStreamOutMethod implementations of those
+ methods.
+ multi_method: An face_interfaces.MultiMethod, or None.
+
+ Returns:
+ A sequence of length three the first element of which is a
+ face_interfaces.Server, the second element of which is a
+ face_interfaces.Stub, (both of which are backed by the given method
+ implementations), and the third element of which is an arbitrary memo
+ object to be kept and passed to tearDownImplementation at the conclusion
+ of the test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def tear_down_implementation(self, memo):
+ """Destroys the Face layer implementation under test.
+
+ Args:
+ memo: The object from the third position of the return value of
+ set_up_implementation.
+ """
+ raise NotImplementedError()