diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-01-23 21:48:42 -0800 |
---|---|---|
committer | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-01-23 21:48:42 -0800 |
commit | e2380338bfc9cec2f44877b3551d4e94490c1daa (patch) | |
tree | 52a464f9d3b64fe756098c5368f3ae426e24e466 /src/python/_framework/face | |
parent | cacae338b6cf32b8deedc26141990da78bd03034 (diff) | |
parent | 7a3c38bed9ea9e69ec748b9ed08cdf16cb6ac501 (diff) |
Merge pull request #193 from nathanielmanistaatgoogle/python-introduction
Bring the rest of Python RPC Framework into GRPC.
Diffstat (limited to 'src/python/_framework/face')
26 files changed, 4726 insertions, 0 deletions
diff --git a/src/python/_framework/face/__init__.py b/src/python/_framework/face/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/face/__init__.py diff --git a/src/python/_framework/face/_calls.py b/src/python/_framework/face/_calls.py new file mode 100644 index 0000000000..ab58e6378b --- /dev/null +++ b/src/python/_framework/face/_calls.py @@ -0,0 +1,310 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utility functions for invoking RPCs.""" + +import threading + +from _framework.base import interfaces as base_interfaces +from _framework.base import util as base_util +from _framework.face import _control +from _framework.face import interfaces +from _framework.foundation import callable_util +from _framework.foundation import future + +_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' +_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!' + + +class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor): + + def __init__(self, rendezvous): + self._rendezvous = rendezvous + + def consumer(self, operation_context): + return self._rendezvous + + +class _EventServicedIngestor(base_interfaces.ServicedIngestor): + + def __init__(self, result_consumer, abortion_callback): + self._result_consumer = result_consumer + self._abortion_callback = abortion_callback + + def consumer(self, operation_context): + operation_context.add_termination_callback( + _control.as_operation_termination_callback(self._abortion_callback)) + return self._result_consumer + + +def _rendezvous_subscription(rendezvous): + return base_util.full_serviced_subscription( + _RendezvousServicedIngestor(rendezvous)) + + +def _unary_event_subscription(completion_callback, abortion_callback): + return base_util.full_serviced_subscription( + _EventServicedIngestor( + _control.UnaryConsumer(completion_callback), abortion_callback)) + + +def _stream_event_subscription(result_consumer, abortion_callback): + return base_util.full_serviced_subscription( + _EventServicedIngestor(result_consumer, abortion_callback)) + + +class _OperationCancellableIterator(interfaces.CancellableIterator): + """An interfaces.CancellableIterator for response-streaming operations.""" + + def __init__(self, rendezvous, operation): + self._rendezvous = rendezvous + self._operation = operation + + def __iter__(self): + return self + + def next(self): + return next(self._rendezvous) + + def cancel(self): + self._operation.cancel() + self._rendezvous.set_outcome(base_interfaces.CANCELLED) + + +class _OperationFuture(future.Future): + """A future.Future interface to an operation.""" + + def __init__(self, rendezvous, operation): + self._condition = threading.Condition() + self._rendezvous = rendezvous + self._operation = operation + + self._outcome = None + self._callbacks = [] + + def cancel(self): + """See future.Future.cancel for specification.""" + with self._condition: + if self._outcome is None: + self._operation.cancel() + self._outcome = future.aborted() + self._condition.notify_all() + return False + + def cancelled(self): + """See future.Future.cancelled for specification.""" + return False + + def done(self): + """See future.Future.done for specification.""" + with self._condition: + return (self._outcome is not None and + self._outcome.category is not future.ABORTED) + + def outcome(self): + """See future.Future.outcome for specification.""" + with self._condition: + while self._outcome is None: + self._condition.wait() + return self._outcome + + def add_done_callback(self, callback): + """See future.Future.add_done_callback for specification.""" + with self._condition: + if self._callbacks is not None: + self._callbacks.add(callback) + return + + outcome = self._outcome + + callable_util.call_logging_exceptions( + callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + + def on_operation_termination(self, operation_outcome): + """Indicates to this object that the operation has terminated. + + Args: + operation_outcome: One of base_interfaces.COMPLETED, + base_interfaces.CANCELLED, base_interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE, + base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE + indicating the categorical outcome of the operation. + """ + with self._condition: + if (self._outcome is None and + operation_outcome != base_interfaces.COMPLETED): + self._outcome = future.raised( + _control.abortion_outcome_to_exception(operation_outcome)) + self._condition.notify_all() + + outcome = self._outcome + rendezvous = self._rendezvous + callbacks = list(self._callbacks) + self._callbacks = None + + if outcome is None: + try: + return_value = next(rendezvous) + except Exception as e: # pylint: disable=broad-except + outcome = future.raised(e) + else: + outcome = future.returned(return_value) + with self._condition: + if self._outcome is None: + self._outcome = outcome + self._condition.notify_all() + else: + outcome = self._outcome + + for callback in callbacks: + callable_util.call_logging_exceptions( + callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + + +class _Call(interfaces.Call): + + def __init__(self, operation): + self._operation = operation + self.context = _control.RpcContext(operation.context) + + def cancel(self): + self._operation.cancel() + + +def blocking_value_in_value_out(front, name, payload, timeout, trace_id): + """Services in a blocking fashion a value-in value-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + return next(rendezvous) + + +def future_value_in_value_out(front, name, payload, timeout, trace_id): + """Services a value-in value-out servicer method by returning a Future.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + operation_future = _OperationFuture(rendezvous, operation) + operation.context.add_termination_callback( + operation_future.on_operation_termination) + return operation_future + + +def inline_value_in_stream_out(front, name, payload, timeout, trace_id): + """Services a value-in stream-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + return _OperationCancellableIterator(rendezvous, operation) + + +def blocking_stream_in_value_out( + front, name, payload_iterator, timeout, trace_id): + """Services in a blocking fashion a stream-in value-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + for payload in payload_iterator: + operation.consumer.consume(payload) + operation.consumer.terminate() + return next(rendezvous) + + +def future_stream_in_value_out( + front, name, payload_iterator, timeout, trace_id, pool): + """Services a stream-in value-out servicer method by returning a Future.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + pool.submit( + callable_util.with_exceptions_logged( + _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), + payload_iterator, operation.consumer, lambda: True, True) + operation_future = _OperationFuture(rendezvous, operation) + operation.context.add_termination_callback( + operation_future.on_operation_termination) + return operation_future + + +def inline_stream_in_stream_out( + front, name, payload_iterator, timeout, trace_id, pool): + """Services a stream-in stream-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + pool.submit( + callable_util.with_exceptions_logged( + _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), + payload_iterator, operation.consumer, lambda: True, True) + return _OperationCancellableIterator(rendezvous, operation) + + +def event_value_in_value_out( + front, name, payload, completion_callback, abortion_callback, timeout, + trace_id): + subscription = _unary_event_subscription( + completion_callback, abortion_callback) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + return _Call(operation) + + +def event_value_in_stream_out( + front, name, payload, result_payload_consumer, abortion_callback, timeout, + trace_id): + subscription = _stream_event_subscription( + result_payload_consumer, abortion_callback) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + return _Call(operation) + + +def event_stream_in_value_out( + front, name, completion_callback, abortion_callback, timeout, trace_id): + subscription = _unary_event_subscription( + completion_callback, abortion_callback) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + return _Call(operation), operation.consumer + + +def event_stream_in_stream_out( + front, name, result_payload_consumer, abortion_callback, timeout, trace_id): + subscription = _stream_event_subscription( + result_payload_consumer, abortion_callback) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + return _Call(operation), operation.consumer diff --git a/src/python/_framework/face/_control.py b/src/python/_framework/face/_control.py new file mode 100644 index 0000000000..2c221321d6 --- /dev/null +++ b/src/python/_framework/face/_control.py @@ -0,0 +1,194 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for translating between sync and async control flow.""" + +import threading + +from _framework.base import interfaces as base_interfaces +from _framework.face import exceptions +from _framework.face import interfaces +from _framework.foundation import abandonment +from _framework.foundation import stream + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-(' + +_OPERATION_OUTCOME_TO_RPC_ABORTION = { + base_interfaces.CANCELLED: interfaces.CANCELLED, + base_interfaces.EXPIRED: interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE, + base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE, + base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, + base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, + } + + +def _as_operation_termination_callback(rpc_abortion_callback): + def operation_termination_callback(operation_outcome): + rpc_abortion = _OPERATION_OUTCOME_TO_RPC_ABORTION.get( + operation_outcome, None) + if rpc_abortion is not None: + rpc_abortion_callback(rpc_abortion) + return operation_termination_callback + + +def _abortion_outcome_to_exception(abortion_outcome): + if abortion_outcome == base_interfaces.CANCELLED: + return exceptions.CancellationError() + elif abortion_outcome == base_interfaces.EXPIRED: + return exceptions.ExpirationError() + elif abortion_outcome == base_interfaces.SERVICER_FAILURE: + return exceptions.ServicerError() + elif abortion_outcome == base_interfaces.SERVICED_FAILURE: + return exceptions.ServicedError() + else: + return exceptions.NetworkError() + + +class UnaryConsumer(stream.Consumer): + """A stream.Consumer that should only ever be passed one value.""" + + def __init__(self, on_termination): + self._on_termination = on_termination + self._value = None + + def consume(self, value): + self._value = value + + def terminate(self): + self._on_termination(self._value) + + def consume_and_terminate(self, value): + self._on_termination(value) + + +class Rendezvous(stream.Consumer): + """A rendez-vous with stream.Consumer and iterator interfaces.""" + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._values_completed = False + self._abortion = None + + def consume(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def terminate(self): + with self._condition: + self._values_completed = True + self._condition.notify() + + def consume_and_terminate(self, value): + with self._condition: + self._values.append(value) + self._values_completed = True + self._condition.notify() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while ((self._abortion is None) and + (not self._values) and + (not self._values_completed)): + self._condition.wait() + if self._abortion is not None: + raise _abortion_outcome_to_exception(self._abortion) + elif self._values: + return self._values.pop(0) + elif self._values_completed: + raise StopIteration() + else: + raise AssertionError('Unreachable code reached!') + + def set_outcome(self, outcome): + with self._condition: + if outcome != base_interfaces.COMPLETED: + self._abortion = outcome + self._condition.notify() + + +class RpcContext(interfaces.RpcContext): + """A wrapped base_interfaces.OperationContext.""" + + def __init__(self, operation_context): + self._operation_context = operation_context + + def is_active(self): + return self._operation_context.is_active() + + def time_remaining(self): + return self._operation_context.time_remaining() + + def add_abortion_callback(self, abortion_callback): + self._operation_context.add_termination_callback( + _as_operation_termination_callback(abortion_callback)) + + +def pipe_iterator_to_consumer(iterator, consumer, active, terminate): + """Pipes values emitted from an iterator to a stream.Consumer. + + Args: + iterator: An iterator from which values will be emitted. + consumer: A stream.Consumer to which values will be passed. + active: A no-argument callable that returns True if the work being done by + this function is still valid and should not be abandoned and False if the + work being done by this function should be abandoned. + terminate: A boolean indicating whether or not this function should + terminate the given consumer after passing to it all values emitted by the + given iterator. + + Raises: + abandonment.Abandoned: If this function quits early after seeing False + returned by the active function passed to it. + Exception: This function raises whatever exceptions are raised by iterating + over the given iterator. + """ + for element in iterator: + if not active(): + raise abandonment.Abandoned() + + consumer.consume(element) + + if not active(): + raise abandonment.Abandoned() + if terminate: + consumer.terminate() + + +def abortion_outcome_to_exception(abortion_outcome): + return _abortion_outcome_to_exception(abortion_outcome) + + +def as_operation_termination_callback(rpc_abortion_callback): + return _as_operation_termination_callback(rpc_abortion_callback) diff --git a/src/python/_framework/face/_service.py b/src/python/_framework/face/_service.py new file mode 100644 index 0000000000..d758c2f148 --- /dev/null +++ b/src/python/_framework/face/_service.py @@ -0,0 +1,189 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Behaviors for servicing RPCs.""" + +# base_interfaces and interfaces are referenced from specification in this +# module. +from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import +from _framework.face import _control +from _framework.face import exceptions +from _framework.face import interfaces # pylint: disable=unused-import +from _framework.foundation import abandonment +from _framework.foundation import callable_util +from _framework.foundation import stream +from _framework.foundation import stream_util + + +class _ValueInStreamOutConsumer(stream.Consumer): + """A stream.Consumer that maps inputs one-to-many onto outputs.""" + + def __init__(self, behavior, context, downstream): + """Constructor. + + Args: + behavior: A callable that takes a single value and an + interfaces.RpcContext and returns a generator of arbitrarily many + values. + context: An interfaces.RpcContext. + downstream: A stream.Consumer to which to pass the values generated by the + given behavior. + """ + self._behavior = behavior + self._context = context + self._downstream = downstream + + def consume(self, value): + _control.pipe_iterator_to_consumer( + self._behavior(value, self._context), self._downstream, + self._context.is_active, False) + + def terminate(self): + self._downstream.terminate() + + def consume_and_terminate(self, value): + _control.pipe_iterator_to_consumer( + self._behavior(value, self._context), self._downstream, + self._context.is_active, True) + + +def _pool_wrap(behavior, operation_context): + """Wraps an operation-related behavior so that it may be called in a pool. + + Args: + behavior: A callable related to carrying out an operation. + operation_context: A base_interfaces.OperationContext for the operation. + + Returns: + A callable that when called carries out the behavior of the given callable + and handles whatever exceptions it raises appropriately. + """ + def translation(*args): + try: + behavior(*args) + except ( + abandonment.Abandoned, + exceptions.ExpirationError, + exceptions.CancellationError, + exceptions.ServicedError, + exceptions.NetworkError) as e: + if operation_context.is_active(): + operation_context.fail(e) + except Exception as e: + operation_context.fail(e) + return callable_util.with_exceptions_logged( + translation, _control.INTERNAL_ERROR_LOG_MESSAGE) + + +def adapt_inline_value_in_value_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return stream_util.TransformingConsumer( + lambda request: method.service(request, rpc_context), response_consumer) + return adaptation + + +def adapt_inline_value_in_stream_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return _ValueInStreamOutConsumer( + method.service, rpc_context, response_consumer) + return adaptation + + +def adapt_inline_stream_in_value_out(method, pool): + def adaptation(response_consumer, operation_context): + rendezvous = _control.Rendezvous() + operation_context.add_termination_callback(rendezvous.set_outcome) + def in_pool_thread(): + response_consumer.consume_and_terminate( + method.service(rendezvous, _control.RpcContext(operation_context))) + pool.submit(_pool_wrap(in_pool_thread, operation_context)) + return rendezvous + return adaptation + + +def adapt_inline_stream_in_stream_out(method, pool): + """Adapts an interfaces.InlineStreamInStreamOutMethod for use with Consumers. + + RPCs may be serviced by calling the return value of this function, passing + request values to the stream.Consumer returned from that call, and receiving + response values from the stream.Consumer passed to that call. + + Args: + method: An interfaces.InlineStreamInStreamOutMethod. + pool: A thread pool. + + Returns: + A callable that takes a stream.Consumer and a + base_interfaces.OperationContext and returns a stream.Consumer. + """ + def adaptation(response_consumer, operation_context): + rendezvous = _control.Rendezvous() + operation_context.add_termination_callback(rendezvous.set_outcome) + def in_pool_thread(): + _control.pipe_iterator_to_consumer( + method.service(rendezvous, _control.RpcContext(operation_context)), + response_consumer, operation_context.is_active, True) + pool.submit(_pool_wrap(in_pool_thread, operation_context)) + return rendezvous + return adaptation + + +def adapt_event_value_in_value_out(method): + def adaptation(response_consumer, operation_context): + def on_payload(payload): + method.service( + payload, response_consumer.consume_and_terminate, + _control.RpcContext(operation_context)) + return _control.UnaryConsumer(on_payload) + return adaptation + + +def adapt_event_value_in_stream_out(method): + def adaptation(response_consumer, operation_context): + def on_payload(payload): + method.service( + payload, response_consumer, _control.RpcContext(operation_context)) + return _control.UnaryConsumer(on_payload) + return adaptation + + +def adapt_event_stream_in_value_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return method.service(response_consumer.consume_and_terminate, rpc_context) + return adaptation + + +def adapt_event_stream_in_stream_out(method): + def adaptation(response_consumer, operation_context): + return method.service( + response_consumer, _control.RpcContext(operation_context)) + return adaptation diff --git a/src/python/_framework/face/_test_case.py b/src/python/_framework/face/_test_case.py new file mode 100644 index 0000000000..50b55c389f --- /dev/null +++ b/src/python/_framework/face/_test_case.py @@ -0,0 +1,81 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Common lifecycle code for in-memory-ticket-exchange Face-layer tests.""" + +from _framework.face import implementations +from _framework.face.testing import base_util +from _framework.face.testing import test_case +from _framework.foundation import logging_pool + +_TIMEOUT = 3 +_MAXIMUM_POOL_SIZE = 100 + + +class FaceTestCase(test_case.FaceTestCase): + """Provides abstract Face-layer tests an in-memory implementation.""" + + def set_up_implementation( + self, + name, + methods, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods, + multi_method): + servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + + servicer = implementations.servicer( + servicer_pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, + multi_method=multi_method) + + linked_pair = base_util.linked_pair(servicer, _TIMEOUT) + server = implementations.server() + stub = implementations.stub(linked_pair.front, stub_pool) + return server, stub, (servicer_pool, stub_pool, linked_pair) + + def tear_down_implementation(self, memo): + servicer_pool, stub_pool, linked_pair = memo + linked_pair.shut_down() + stub_pool.shutdown(wait=True) + servicer_pool.shutdown(wait=True) diff --git a/src/python/_framework/face/blocking_invocation_inline_service_test.py b/src/python/_framework/face/blocking_invocation_inline_service_test.py new file mode 100644 index 0000000000..96563c94ee --- /dev/null +++ b/src/python/_framework/face/blocking_invocation_inline_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case + + +class BlockingInvocationInlineServiceTest( + _test_case.FaceTestCase, + test_case.BlockingInvocationInlineServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/demonstration.py b/src/python/_framework/face/demonstration.py new file mode 100644 index 0000000000..501ec6b3f8 --- /dev/null +++ b/src/python/_framework/face/demonstration.py @@ -0,0 +1,118 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Demonstration-suitable implementation of the face layer of RPC Framework.""" + +from _framework.base import util as _base_util +from _framework.base.packets import implementations as _tickets_implementations +from _framework.face import implementations +from _framework.foundation import logging_pool + +_POOL_SIZE_LIMIT = 20 + +_MAXIMUM_TIMEOUT = 90 + + +class LinkedPair(object): + """A Server and Stub that are linked to one another. + + Attributes: + server: A Server. + stub: A Stub. + """ + + def shut_down(self): + """Shuts down this object and releases its resources.""" + raise NotImplementedError() + + +class _LinkedPair(LinkedPair): + + def __init__(self, server, stub, front, back, pools): + self.server = server + self.stub = stub + self._front = front + self._back = back + self._pools = pools + + def shut_down(self): + _base_util.wait_for_idle(self._front) + _base_util.wait_for_idle(self._back) + + for pool in self._pools: + pool.shutdown(wait=True) + + +def server_and_stub( + default_timeout, + inline_value_in_value_out_methods=None, + inline_value_in_stream_out_methods=None, + inline_stream_in_value_out_methods=None, + inline_stream_in_stream_out_methods=None, + event_value_in_value_out_methods=None, + event_value_in_stream_out_methods=None, + event_stream_in_value_out_methods=None, + event_stream_in_stream_out_methods=None, + multi_method=None): + """Creates a Server and Stub linked together for use.""" + front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + stub_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + pools = ( + front_work_pool, front_transmission_pool, front_utility_pool, + back_work_pool, back_transmission_pool, back_utility_pool, + stub_pool) + + servicer = implementations.servicer( + back_work_pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, + multi_method=multi_method) + + front = _tickets_implementations.front( + front_work_pool, front_transmission_pool, front_utility_pool) + back = _tickets_implementations.back( + servicer, back_work_pool, back_transmission_pool, back_utility_pool, + default_timeout, _MAXIMUM_TIMEOUT) + front.join_rear_link(back) + back.join_fore_link(front) + + stub = implementations.stub(front, stub_pool) + + return _LinkedPair(implementations.server(), stub, front, back, pools) diff --git a/src/python/_framework/face/event_invocation_synchronous_event_service_test.py b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py new file mode 100644 index 0000000000..48e05b2478 --- /dev/null +++ b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case + + +class EventInvocationSynchronousEventServiceTest( + _test_case.FaceTestCase, + test_case.EventInvocationSynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/exceptions.py b/src/python/_framework/face/exceptions.py new file mode 100644 index 0000000000..f112df70bc --- /dev/null +++ b/src/python/_framework/face/exceptions.py @@ -0,0 +1,77 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Exceptions used in the Face layer of RPC Framework.""" + +import abc + + +class NoSuchMethodError(Exception): + """Raised by customer code to indicate an unrecognized RPC method name. + + Attributes: + name: The unrecognized name. + """ + + def __init__(self, name): + """Constructor. + + Args: + name: The unrecognized RPC method name. + """ + super(NoSuchMethodError, self).__init__() + self.name = name + + +class RpcError(Exception): + """Common super type for all exceptions raised by the Face layer. + + Only RPC Framework should instantiate and raise these exceptions. + """ + __metaclass__ = abc.ABCMeta + + +class CancellationError(RpcError): + """Indicates that an RPC has been cancelled.""" + + +class ExpirationError(RpcError): + """Indicates that an RPC has expired ("timed out").""" + + +class NetworkError(RpcError): + """Indicates that some error occurred on the network.""" + + +class ServicedError(RpcError): + """Indicates that the Serviced failed in the course of an RPC.""" + + +class ServicerError(RpcError): + """Indicates that the Servicer failed in the course of servicing an RPC.""" diff --git a/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py new file mode 100644 index 0000000000..96f5fe85d3 --- /dev/null +++ b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case + + +class FutureInvocationAsynchronousEventServiceTest( + _test_case.FaceTestCase, + test_case.FutureInvocationAsynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/implementations.py b/src/python/_framework/face/implementations.py new file mode 100644 index 0000000000..94362e2007 --- /dev/null +++ b/src/python/_framework/face/implementations.py @@ -0,0 +1,246 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the Face layer of RPC Framework.""" + +from _framework.base import exceptions as _base_exceptions +from _framework.base import interfaces as base_interfaces +from _framework.face import _calls +from _framework.face import _service +from _framework.face import exceptions +from _framework.face import interfaces + + +class _BaseServicer(base_interfaces.Servicer): + + def __init__(self, methods, multi_method): + self._methods = methods + self._multi_method = multi_method + + def service(self, name, context, output_consumer): + method = self._methods.get(name, None) + if method is not None: + return method(output_consumer, context) + elif self._multi_method is not None: + try: + return self._multi_method.service(name, output_consumer, context) + except exceptions.NoSuchMethodError: + raise _base_exceptions.NoSuchMethodError() + else: + raise _base_exceptions.NoSuchMethodError() + + +class _Server(interfaces.Server): + """An interfaces.Server implementation.""" + + +class _Stub(interfaces.Stub): + """An interfaces.Stub implementation.""" + + def __init__(self, front, pool): + self._front = front + self._pool = pool + + def blocking_value_in_value_out(self, name, request, timeout): + return _calls.blocking_value_in_value_out( + self._front, name, request, timeout, 'unused trace ID') + + def future_value_in_value_out(self, name, request, timeout): + return _calls.future_value_in_value_out( + self._front, name, request, timeout, 'unused trace ID') + + def inline_value_in_stream_out(self, name, request, timeout): + return _calls.inline_value_in_stream_out( + self._front, name, request, timeout, 'unused trace ID') + + def blocking_stream_in_value_out(self, name, request_iterator, timeout): + return _calls.blocking_stream_in_value_out( + self._front, name, request_iterator, timeout, 'unused trace ID') + + def future_stream_in_value_out(self, name, request_iterator, timeout): + return _calls.future_stream_in_value_out( + self._front, name, request_iterator, timeout, 'unused trace ID', + self._pool) + + def inline_stream_in_stream_out(self, name, request_iterator, timeout): + return _calls.inline_stream_in_stream_out( + self._front, name, request_iterator, timeout, 'unused trace ID', + self._pool) + + def event_value_in_value_out( + self, name, request, response_callback, abortion_callback, timeout): + return _calls.event_value_in_value_out( + self._front, name, request, response_callback, abortion_callback, + timeout, 'unused trace ID') + + def event_value_in_stream_out( + self, name, request, response_consumer, abortion_callback, timeout): + return _calls.event_value_in_stream_out( + self._front, name, request, response_consumer, abortion_callback, + timeout, 'unused trace ID') + + def event_stream_in_value_out( + self, name, response_callback, abortion_callback, timeout): + return _calls.event_stream_in_value_out( + self._front, name, response_callback, abortion_callback, timeout, + 'unused trace ID') + + def event_stream_in_stream_out( + self, name, response_consumer, abortion_callback, timeout): + return _calls.event_stream_in_stream_out( + self._front, name, response_consumer, abortion_callback, timeout, + 'unused trace ID') + + +def _aggregate_methods( + pool, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods): + """Aggregates methods coded in according to different interfaces.""" + methods = {} + + def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation): + if unadapted_methods is not None: + for name, unadapted_method in unadapted_methods.iteritems(): + adapted_methods[name] = adaptation(unadapted_method) + + def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation): + if unadapted_methods is not None: + for name, unadapted_method in unadapted_methods.iteritems(): + adapted_methods[name] = adaptation(unadapted_method, pool) + + adapt_unpooled_methods( + methods, inline_value_in_value_out_methods, + _service.adapt_inline_value_in_value_out) + adapt_unpooled_methods( + methods, inline_value_in_stream_out_methods, + _service.adapt_inline_value_in_stream_out) + adapt_pooled_methods( + methods, inline_stream_in_value_out_methods, + _service.adapt_inline_stream_in_value_out) + adapt_pooled_methods( + methods, inline_stream_in_stream_out_methods, + _service.adapt_inline_stream_in_stream_out) + adapt_unpooled_methods( + methods, event_value_in_value_out_methods, + _service.adapt_event_value_in_value_out) + adapt_unpooled_methods( + methods, event_value_in_stream_out_methods, + _service.adapt_event_value_in_stream_out) + adapt_unpooled_methods( + methods, event_stream_in_value_out_methods, + _service.adapt_event_stream_in_value_out) + adapt_unpooled_methods( + methods, event_stream_in_stream_out_methods, + _service.adapt_event_stream_in_stream_out) + + return methods + + +def servicer( + pool, + inline_value_in_value_out_methods=None, + inline_value_in_stream_out_methods=None, + inline_stream_in_value_out_methods=None, + inline_stream_in_stream_out_methods=None, + event_value_in_value_out_methods=None, + event_value_in_stream_out_methods=None, + event_stream_in_value_out_methods=None, + event_stream_in_stream_out_methods=None, + multi_method=None): + """Creates a base_interfaces.Servicer. + + The key sets of the passed dictionaries must be disjoint. It is guaranteed + that any passed MultiMethod implementation will only be called to service an + RPC if the RPC method name is not present in the key sets of the passed + dictionaries. + + Args: + pool: A thread pool. + inline_value_in_value_out_methods: A dictionary mapping method names to + interfaces.InlineValueInValueOutMethod implementations. + inline_value_in_stream_out_methods: A dictionary mapping method names to + interfaces.InlineValueInStreamOutMethod implementations. + inline_stream_in_value_out_methods: A dictionary mapping method names to + interfaces.InlineStreamInValueOutMethod implementations. + inline_stream_in_stream_out_methods: A dictionary mapping method names to + interfaces.InlineStreamInStreamOutMethod implementations. + event_value_in_value_out_methods: A dictionary mapping method names to + interfaces.EventValueInValueOutMethod implementations. + event_value_in_stream_out_methods: A dictionary mapping method names to + interfaces.EventValueInStreamOutMethod implementations. + event_stream_in_value_out_methods: A dictionary mapping method names to + interfaces.EventStreamInValueOutMethod implementations. + event_stream_in_stream_out_methods: A dictionary mapping method names to + interfaces.EventStreamInStreamOutMethod implementations. + multi_method: An implementation of interfaces.MultiMethod. + + Returns: + A base_interfaces.Servicer that services RPCs via the given implementations. + """ + methods = _aggregate_methods( + pool, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods) + + return _BaseServicer(methods, multi_method) + + +def server(): + """Creates an interfaces.Server. + + Returns: + An interfaces.Server. + """ + return _Server() + + +def stub(front, pool): + """Creates an interfaces.Stub. + + Args: + front: A base_interfaces.Front. + pool: A futures.ThreadPoolExecutor. + + Returns: + An interfaces.Stub that performs RPCs via the given base_interfaces.Front. + """ + return _Stub(front, pool) diff --git a/src/python/_framework/face/interfaces.py b/src/python/_framework/face/interfaces.py new file mode 100644 index 0000000000..0cc7c70df3 --- /dev/null +++ b/src/python/_framework/face/interfaces.py @@ -0,0 +1,545 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces for the face layer of RPC Framework.""" + +import abc + +# exceptions, abandonment, and future are referenced from specification in this +# module. +from _framework.face import exceptions # pylint: disable=unused-import +from _framework.foundation import abandonment # pylint: disable=unused-import +from _framework.foundation import future # pylint: disable=unused-import + + +class CancellableIterator(object): + """Implements the Iterator protocol and affords a cancel method.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __iter__(self): + """Returns the self object in accordance with the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def next(self): + """Returns a value or raises StopIteration per the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of whatever computation underlies this iterator.""" + raise NotImplementedError() + + +# Constants that categorize RPC abortion. +# TODO(nathaniel): Learn and use Python's enum library for this de facto +# enumerated type +CANCELLED = 'abortion: cancelled' +EXPIRED = 'abortion: expired' +NETWORK_FAILURE = 'abortion: network failure' +SERVICED_FAILURE = 'abortion: serviced failure' +SERVICER_FAILURE = 'abortion: servicer failure' + + +class RpcContext(object): + """Provides RPC-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_abortion_callback(self, abortion_callback): + """Registers a callback to be called if the RPC is aborted. + + Args: + abortion_callback: A callable to be called and passed one of CANCELLED, + EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the + event of RPC abortion. + """ + raise NotImplementedError() + + +class InlineValueInValueOutMethod(object): + """A type for inline unary-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, context): + """Services an RPC that accepts one value and produces one value. + + Args: + request: The single request value for the RPC. + context: An RpcContext object. + + Returns: + The single response value for the RPC. + + Raises: + abandonment.Abandoned: If no response is necessary because the RPC has + been aborted. + """ + raise NotImplementedError() + + +class InlineValueInStreamOutMethod(object): + """A type for inline unary-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, context): + """Services an RPC that accepts one value and produces a stream of values. + + Args: + request: The single request value for the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If completing the response stream is not necessary + because the RPC has been aborted. + """ + raise NotImplementedError() + + +class InlineStreamInValueOutMethod(object): + """A type for inline stream-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request_iterator, context): + """Services an RPC that accepts a stream of values and produces one value. + + Args: + request_iterator: An iterator that yields the request values of the RPC. + Drawing values from this iterator may also raise exceptions.RpcError to + indicate abortion of the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If no response is necessary because the RPC has + been aborted. + exceptions.RpcError: Implementations of this method must not deliberately + raise exceptions.RpcError but may allow such errors raised by the + request_iterator passed to them to propagate through their bodies + uncaught. + """ + raise NotImplementedError() + + +class InlineStreamInStreamOutMethod(object): + """A type for inline stream-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request_iterator, context): + """Services an RPC that accepts and produces streams of values. + + Args: + request_iterator: An iterator that yields the request values of the RPC. + Drawing values from this iterator may also raise exceptions.RpcError to + indicate abortion of the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If completing the response stream is not necessary + because the RPC has been aborted. + exceptions.RpcError: Implementations of this method must not deliberately + raise exceptions.RpcError but may allow such errors raised by the + request_iterator passed to them to propagate through their bodies + uncaught. + """ + raise NotImplementedError() + + +class EventValueInValueOutMethod(object): + """A type for event-driven unary-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context): + """Services an RPC that accepts one value and produces one value. + + Args: + request: The single request value for the RPC. + response_callback: A callback to be called to accept the response value of + the RPC. + context: An RpcContext object. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventValueInStreamOutMethod(object): + """A type for event-driven unary-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context): + """Services an RPC that accepts one value and produces a stream of values. + + Args: + request: The single request value for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventStreamInValueOutMethod(object): + """A type for event-driven stream-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context): + """Services an RPC that accepts a stream of values and produces one value. + + Args: + response_callback: A callback to be called to accept the response value of + the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventStreamInStreamOutMethod(object): + """A type for event-driven stream-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context): + """Services an RPC that accepts and produces streams of values. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class MultiMethod(object): + """A general type able to service many RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, name, response_consumer, context): + """Services an RPC. + + Args: + name: The RPC method name. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + exceptions.NoSuchMethodError: If this MultiMethod does not recognize the + given RPC method name and is not able to service the RPC. + """ + raise NotImplementedError() + + +class Server(object): + """Specification of a running server that services RPCs.""" + __metaclass__ = abc.ABCMeta + + +class Call(object): + """Invocation-side representation of an RPC. + + Attributes: + context: An RpcContext affording information about the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of the RPC.""" + raise NotImplementedError() + + +class Stub(object): + """Affords RPC methods to callers.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def blocking_value_in_value_out(self, name, request, timeout): + """Invokes a unary-request-unary-response RPC method. + + This method blocks until either returning the response value of the RPC + (in the event of RPC completion) or raising an exception (in the event of + RPC abortion). + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future_value_in_value_out(self, name, request, timeout): + """Invokes a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future will return an outcome indicating that the RPC returned + the response value of the RPC. In the event of RPC abortion, the + returned Future will return an outcome indicating that the RPC raised + an exceptions.RpcError. + """ + raise NotImplementedError() + + @abc.abstractmethod + def inline_value_in_stream_out(self, name, request, timeout): + """Invokes a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion of + the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def blocking_stream_in_value_out(self, name, request_iterator, timeout): + """Invokes a stream-request-unary-response RPC method. + + This method blocks until either returning the response value of the RPC + (in the event of RPC completion) or raising an exception (in the event of + RPC abortion). + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future_stream_in_value_out(self, name, request_iterator, timeout): + """Invokes a stream-request-unary-response RPC method. + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future will return an outcome indicating that the RPC returned + the response value of the RPC. In the event of RPC abortion, the + returned Future will return an outcome indicating that the RPC raised + an exceptions.RpcError. + """ + raise NotImplementedError() + + @abc.abstractmethod + def inline_stream_in_stream_out(self, name, request_iterator, timeout): + """Invokes a stream-request-stream-response RPC method. + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion of + the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_value_in_value_out( + self, name, request, response_callback, abortion_callback, timeout): + """Event-driven invocation of a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + response_callback: A callback to be called to accept the response value + of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A Call object for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_value_in_stream_out( + self, name, request, response_consumer, abortion_callback, timeout): + """Event-driven invocation of a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A Call object for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_stream_in_value_out( + self, name, response_callback, abortion_callback, timeout): + """Event-driven invocation of a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + response_callback: A callback to be called to accept the response value + of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_stream_in_stream_out( + self, name, response_consumer, abortion_callback, timeout): + """Event-driven invocation of a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/__init__.py b/src/python/_framework/face/testing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/face/testing/__init__.py diff --git a/src/python/_framework/face/testing/base_util.py b/src/python/_framework/face/testing/base_util.py new file mode 100644 index 0000000000..d9ccb3af8f --- /dev/null +++ b/src/python/_framework/face/testing/base_util.py @@ -0,0 +1,102 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for creating Base-layer objects for use in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from _framework.base import util as _base_util +from _framework.base.packets import implementations +from _framework.base.packets import in_memory +from _framework.base.packets import interfaces # pylint: disable=unused-import +from _framework.foundation import logging_pool + +_POOL_SIZE_LIMIT = 20 + +_MAXIMUM_TIMEOUT = 90 + + +class LinkedPair(object): + """A Front and Back that are linked to one another. + + Attributes: + front: An interfaces.Front. + back: An interfaces.Back. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def shut_down(self): + """Shuts down this object and releases its resources.""" + raise NotImplementedError() + + +class _LinkedPair(LinkedPair): + + def __init__(self, front, back, pools): + self.front = front + self.back = back + self._pools = pools + + def shut_down(self): + _base_util.wait_for_idle(self.front) + _base_util.wait_for_idle(self.back) + + for pool in self._pools: + pool.shutdown(wait=True) + + +def linked_pair(servicer, default_timeout): + """Creates a Server and Stub linked together for use.""" + link_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + pools = ( + link_pool, + front_work_pool, front_transmission_pool, front_utility_pool, + back_work_pool, back_transmission_pool, back_utility_pool) + + link = in_memory.Link(link_pool) + front = implementations.front( + front_work_pool, front_transmission_pool, front_utility_pool) + back = implementations.back( + servicer, back_work_pool, back_transmission_pool, back_utility_pool, + default_timeout, _MAXIMUM_TIMEOUT) + front.join_rear_link(link) + link.join_fore_link(front) + back.join_fore_link(link) + link.join_rear_link(back) + + return _LinkedPair(front, back, pools) diff --git a/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py new file mode 100644 index 0000000000..0b1a2f0bd2 --- /dev/null +++ b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -0,0 +1,223 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +# unittest is referenced from specification in this module. +import abc +import unittest # pylint: disable=unused-import + +from _framework.face import exceptions +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case + +_TIMEOUT = 3 + + +class BlockingInvocationInlineServiceTestCase( + test_case.FaceTestCase, coverage.BlockingCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + self.digest.inline_unary_unary_methods, + self.digest.inline_unary_stream_methods, + self.digest.inline_stream_unary_methods, + self.digest.inline_stream_stream_methods, + {}, {}, {}, {}, None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response = self.stub.blocking_value_in_value_out( + name, request, _TIMEOUT) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response = self.stub.blocking_stream_in_value_out( + name, iter(requests), _TIMEOUT) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self.stub.blocking_value_in_value_out( + name, first_request, _TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self.stub.blocking_value_in_value_out( + name, second_request, _TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) diff --git a/src/python/_framework/face/testing/callback.py b/src/python/_framework/face/testing/callback.py new file mode 100644 index 0000000000..7a20869abe --- /dev/null +++ b/src/python/_framework/face/testing/callback.py @@ -0,0 +1,94 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A utility useful in tests of asynchronous, event-driven interfaces.""" + +import threading + +from _framework.foundation import stream + + +class Callback(stream.Consumer): + """A utility object useful in tests of asynchronous code.""" + + def __init__(self): + self._condition = threading.Condition() + self._unary_response = None + self._streamed_responses = [] + self._completed = False + self._abortion = None + + def abort(self, abortion): + with self._condition: + self._abortion = abortion + self._condition.notify_all() + + def complete(self, unary_response): + with self._condition: + self._unary_response = unary_response + self._completed = True + self._condition.notify_all() + + def consume(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + + def terminate(self): + with self._condition: + self._completed = True + self._condition.notify_all() + + def consume_and_terminate(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + self._completed = True + self._condition.notify_all() + + def block_until_terminated(self): + with self._condition: + while self._abortion is None and not self._completed: + self._condition.wait() + + def response(self): + with self._condition: + if self._abortion is None: + return self._unary_response + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def responses(self): + with self._condition: + if self._abortion is None: + return list(self._streamed_responses) + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def abortion(self): + with self._condition: + return self._abortion diff --git a/src/python/_framework/face/testing/control.py b/src/python/_framework/face/testing/control.py new file mode 100644 index 0000000000..3960c4e649 --- /dev/null +++ b/src/python/_framework/face/testing/control.py @@ -0,0 +1,87 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for instructing systems under test to block or fail.""" + +import abc +import contextlib +import threading + + +class Control(object): + """An object that accepts program control from a system under test. + + Systems under test passed a Control should call its control() method + frequently during execution. The control() method may block, raise an + exception, or do nothing, all according to the enclosing test's desire for + the system under test to simulate hanging, failing, or functioning. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def control(self): + """Potentially does anything.""" + raise NotImplementedError() + + +class PauseFailControl(Control): + """A Control that can be used to pause or fail code under control.""" + + def __init__(self): + self._condition = threading.Condition() + self._paused = False + self._fail = False + + def control(self): + with self._condition: + if self._fail: + raise ValueError() + + while self._paused: + self._condition.wait() + + @contextlib.contextmanager + def pause(self): + """Pauses code under control while controlling code is in context.""" + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): + """Fails code under control while controlling code is in context.""" + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False diff --git a/src/python/_framework/face/testing/coverage.py b/src/python/_framework/face/testing/coverage.py new file mode 100644 index 0000000000..f3aca113fe --- /dev/null +++ b/src/python/_framework/face/testing/coverage.py @@ -0,0 +1,123 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Governs coverage for the tests of the Face layer of RPC Framework.""" + +import abc + +# These classes are only valid when inherited by unittest.TestCases. +# pylint: disable=invalid-name + + +class BlockingCoverage(object): + """Specification of test coverage for blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testSuccessfulUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSequentialInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestStreamResponse(self): + raise NotImplementedError() + + +class FullCoverage(BlockingCoverage): + """Specification of test coverage for non-blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/digest.py b/src/python/_framework/face/testing/digest.py new file mode 100644 index 0000000000..8d1291c975 --- /dev/null +++ b/src/python/_framework/face/testing/digest.py @@ -0,0 +1,446 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for making a service.TestService more amenable to use in tests.""" + +import collections +import threading + +# testing_control, interfaces, and testing_service are referenced from +# specification in this module. +from _framework.face import exceptions +from _framework.face import interfaces as face_interfaces +from _framework.face.testing import control as testing_control # pylint: disable=unused-import +from _framework.face.testing import interfaces # pylint: disable=unused-import +from _framework.face.testing import service as testing_service # pylint: disable=unused-import +from _framework.foundation import stream +from _framework.foundation import stream_util + +_IDENTITY = lambda x: x + + +class TestServiceDigest( + collections.namedtuple( + 'TestServiceDigest', + ['name', + 'methods', + 'inline_unary_unary_methods', + 'inline_unary_stream_methods', + 'inline_stream_unary_methods', + 'inline_stream_stream_methods', + 'event_unary_unary_methods', + 'event_unary_stream_methods', + 'event_stream_unary_methods', + 'event_stream_stream_methods', + 'multi_method', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences'])): + """A transformation of a service.TestService. + + Attributes: + name: The RPC service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + inline_unary_unary_methods: A dict from method name to + face_interfaces.InlineValueInValueOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_unary_stream_methods: A dict from method name to + face_interfaces.InlineValueInStreamOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_stream_unary_methods: A dict from method name to + face_interfaces.InlineStreamInValueOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_stream_stream_methods: A dict from method name to + face_interfaces.InlineStreamInStreamOutMethod object to be used in tests + of in-line calls to behaviors under test. + event_unary_unary_methods: A dict from method name to + face_interfaces.EventValueInValueOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_unary_stream_methods: A dict from method name to + face_interfaces.EventValueInStreamOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_stream_unary_methods: A dict from method name to + face_interfaces.EventStreamInValueOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_stream_stream_methods: A dict from method name to + face_interfaces.EventStreamInStreamOutMethod object to be used in tests of + event-driven calls to behaviors under test. + multi_method: A face_interfaces.MultiMethod to be used in tests of generic + calls to behaviors under test. + unary_unary_messages_sequences: A dict from method name to sequence of + service.UnaryUnaryTestMessages objects to be used to test the method + with the given name. + unary_stream_messages_sequences: A dict from method name to sequence of + service.UnaryStreamTestMessages objects to be used to test the method + with the given name. + stream_unary_messages_sequences: A dict from method name to sequence of + service.StreamUnaryTestMessages objects to be used to test the method + with the given name. + stream_stream_messages_sequences: A dict from method name to sequence of + service.StreamStreamTestMessages objects to be used to test the + method with the given name. + serialization: A serial.Serialization object describing serialization + behaviors for all the RPC methods. + """ + + +class _BufferingConsumer(stream.Consumer): + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + + def __init__(self): + self.consumed = [] + self.terminated = False + + def consume(self, value): + self.consumed.append(value) + + def terminate(self): + self.terminated = True + + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True + + +class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): + + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control + + def service(self, request, context): + response_list = [] + self._test_method.service( + request, response_list.append, context, self._control) + return response_list.pop(0) + + +class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod): + + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool + + def service(self, request, response_callback, context): + if self._pool is None: + self._test_method.service( + request, response_callback, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_callback, context, + self._control) + + +class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): + + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control + + def service(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service( + request, response_consumer, context, self._control) + for response in response_consumer.consumed: + yield response + + +class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod): + + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool + + def service(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service( + request, response_consumer, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_consumer, context, + self._control) + + +class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): + + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control + + def service(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service( + response_list.append, context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) + + +class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod): + + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool + + def service(self, response_callback, context): + request_consumer = self._test_method.service( + response_callback, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): + + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control + + def service(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service( + response_consumer, context, self._control) + + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() + + +class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod): + + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool + + def service(self, response_consumer, context): + request_consumer = self._test_method.service( + response_consumer, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _UnaryConsumer(stream.Consumer): + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) + + +class _UnaryUnaryAdaptation(object): + + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service( + request, response_consumer.consume_and_terminate, context, control) + return _UnaryConsumer(action) + + +class _UnaryStreamAdaptation(object): + + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service(request, response_consumer, context, control) + return _UnaryConsumer(action) + + +class _StreamUnaryAdaptation(object): + + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method + + def service(self, response_consumer, context, control): + return self._method.service( + response_consumer.consume_and_terminate, context, control) + + +class _MultiMethod(face_interfaces.MultiMethod): + + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool + + def service(self, name, response_consumer, context): + method = self._methods.get(name, None) + if method is None: + raise exceptions.NoSuchMethodError(name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _Assembly( + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble( + scenarios, names, inline_method_constructor, event_method_constructor, + adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = [] + inlines = {} + events = {} + adaptations = {} + messages = {} + for name, scenario in scenarios.iteritems(): + if name in names: + raise ValueError('Repeated name "%s"!' % name) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods.append(test_method) + inlines[name] = inline_method + events[name] = event_method + adaptations[name] = adaptation + messages[name] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) + + +def digest(service, control, pool): + """Creates a TestServiceDigest from a TestService. + + Args: + service: A testing_service.TestService. + control: A testing_control.Control. + pool: If RPC methods should be serviced in a separate thread, a thread pool. + None if RPC methods should be serviced in the thread belonging to the + run-time that calls for their service. + + Returns: + A TestServiceDigest synthesized from the given service.TestService. + """ + names = set() + + unary_unary = _assemble( + service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod, + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) + names.update(set(unary_unary.inlines)) + + unary_stream = _assemble( + service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod, + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) + names.update(set(unary_stream.inlines)) + + stream_unary = _assemble( + service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod, + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) + names.update(set(stream_unary.inlines)) + + stream_stream = _assemble( + service.stream_stream_scenarios(), names, _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, pool) + names.update(set(stream_stream.inlines)) + + methods = list(unary_unary.methods) + methods.extend(unary_stream.methods) + methods.extend(stream_unary.methods) + methods.extend(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + + return TestServiceDigest( + service.name(), + methods, + unary_unary.inlines, + unary_stream.inlines, + stream_unary.inlines, + stream_stream.inlines, + unary_unary.events, + unary_stream.events, + stream_unary.events, + stream_stream.events, + _MultiMethod(adaptations, control, pool), + unary_unary.messages, + unary_stream.messages, + stream_unary.messages, + stream_stream.messages) diff --git a/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py new file mode 100644 index 0000000000..dba73a9368 --- /dev/null +++ b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -0,0 +1,367 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import unittest + +from _framework.face import interfaces +from _framework.face.testing import callback as testing_callback +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case + +_TIMEOUT = 3 + + +class EventInvocationSynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + {}, {}, {}, {}, + self.digest.event_unary_unary_methods, + self.digest.event_unary_stream_methods, + self.digest.event_stream_unary_methods, + self.digest.event_stream_stream_methods, + None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + # pylint: disable=cell-var-from-loop + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + first_callback = testing_callback.Callback() + second_callback = testing_callback.Callback() + + def make_second_invocation(first_response): + first_callback.complete(first_response) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, _TIMEOUT) + + self.stub.event_value_in_value_out( + name, first_request, make_second_invocation, first_callback.abort, + _TIMEOUT) + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + first_callback = testing_callback.Callback() + second_request = test_messages.request() + second_callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, first_request, first_callback.complete, first_callback.abort, + _TIMEOUT) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, _TIMEOUT) + first_callback.block_until_terminated() + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + call = self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + call = self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + call, unused_request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) diff --git a/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py new file mode 100644 index 0000000000..cf8b2eeb95 --- /dev/null +++ b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -0,0 +1,377 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import contextlib +import threading +import unittest + +from _framework.face import exceptions +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case +from _framework.foundation import future +from _framework.foundation import logging_pool + +_TIMEOUT = 3 +_MAXIMUM_POOL_SIZE = 100 + + +class _PauseableIterator(object): + + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False + + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) + + +class FutureInvocationAsynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + {}, {}, {}, {}, + self.digest.event_unary_unary_methods, + self.digest.event_unary_stream_methods, + self.digest.event_stream_unary_methods, + self.digest.event_stream_stream_methods, + None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + self.digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + response = response_future.outcome().return_value + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self.stub.future_stream_in_value_out( + name, request_iterator, _TIMEOUT) + response = response_future.outcome().return_value + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, request_iterator, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self.stub.future_value_in_value_out( + name, first_request, _TIMEOUT) + first_response = first_response_future.outcome().return_value + + test_messages.verify(first_request, first_response, self) + + second_response_future = self.stub.future_value_in_value_out( + name, second_request, _TIMEOUT) + second_response = second_response_future.outcome().return_value + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + outcome = response_future.outcome() + + self.assertIsInstance( + outcome.exception, exceptions.ExpirationError) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + outcome = response_future.outcome() + + self.assertIsInstance( + outcome.exception, exceptions.ExpirationError) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + outcome = response_future.outcome() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_callback before the + # expiration of the RPC. + self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + outcome = response_future.outcome() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_callback before the + # expiration of the RPC. + self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self.stub.future_value_in_value_out( + name, first_request, _TIMEOUT) + second_response_future = self.stub.future_value_in_value_out( + name, second_request, _TIMEOUT) + first_response = first_response_future.outcome().return_value + second_response = second_response_future.outcome().return_value + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + cancelled = response_future.cancel() + + self.assertFalse(cancelled) + self.assertEqual(future.ABORTED, response_future.outcome().category) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(exceptions.CancellationError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + cancelled = response_future.cancel() + + self.assertFalse(cancelled) + self.assertEqual(future.ABORTED, response_future.outcome().category) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(exceptions.CancellationError): + next(response_iterator) diff --git a/src/python/_framework/face/testing/interfaces.py b/src/python/_framework/face/testing/interfaces.py new file mode 100644 index 0000000000..253f6f118d --- /dev/null +++ b/src/python/_framework/face/testing/interfaces.py @@ -0,0 +1,117 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# cardinality is referenced from specification in this module. +from _framework.common import cardinality # pylint: disable=unused-import + + +class Method(object): + """An RPC method to be used in tests of RPC implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identify the name of the method. + + Returns: + The name of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. + + Returns: + A cardinality.Cardinality value describing the streaming semantics of the + method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. + + Returns: + The class object of the class to which the method's request objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. + + Returns: + The class object of the class to which the method's response objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. + + Args: + request: A request object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. + + Args: + serialized_request: A bytestring deserializable into a request object + appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. + + Args: + response: A response object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. + + Args: + serialized_response: A bytestring deserializable into a response object + appropriate for this method. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/serial.py b/src/python/_framework/face/testing/serial.py new file mode 100644 index 0000000000..47fc5822de --- /dev/null +++ b/src/python/_framework/face/testing/serial.py @@ -0,0 +1,70 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utility for serialization in the context of test RPC services.""" + +import collections + + +class Serialization( + collections.namedtuple( + '_Serialization', + ['request_serializers', + 'request_deserializers', + 'response_serializers', + 'response_deserializers'])): + """An aggregation of serialization behaviors for an RPC service. + + Attributes: + request_serializers: A dict from method name to request object serializer + behavior. + request_deserializers: A dict from method name to request object + deserializer behavior. + response_serializers: A dict from method name to response object serializer + behavior. + response_deserializers: A dict from method name to response object + deserializer behavior. + """ + + +def serialization(methods): + """Creates a Serialization from a sequences of interfaces.Method objects.""" + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for method in methods: + name = method.name() + request_serializers[name] = method.serialize_request + request_deserializers[name] = method.deserialize_request + response_serializers[name] = method.serialize_response + response_deserializers[name] = method.deserialize_response + return Serialization( + request_serializers, request_deserializers, response_serializers, + response_deserializers) diff --git a/src/python/_framework/face/testing/service.py b/src/python/_framework/face/testing/service.py new file mode 100644 index 0000000000..771346ec2e --- /dev/null +++ b/src/python/_framework/face/testing/service.py @@ -0,0 +1,337 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from _framework.face.testing import interfaces + + +class UnaryUnaryTestMethod(interfaces.Method): + """Like face_interfaces.EventValueInValueOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. + + Args: + request: The single request message for the RPC. + response_callback: A callback to be called to accept the response message + of the RPC. + context: An face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryUnaryTestMessages(object): + """A type for unary-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. + + Args: + request: A request message. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class UnaryStreamTestMethod(interfaces.Method): + """Like face_interfaces.EventValueInStreamOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. + + Args: + request: The single request message for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryStreamTestMessages(object): + """A type for unary-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. + + Args: + request: A request message. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and responses do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamUnaryTestMethod(interfaces.Method): + """Like face_interfaces.EventStreamInValueOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. + + Args: + response_callback: A callback to be called to accept the response message + of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamUnaryTestMessages(object): + """A type for stream-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamStreamTestMethod(interfaces.Method): + """Like face_interfaces.EventStreamInStreamOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamStreamTestMessages(object): + """A type for stream-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and responses do not match, indicating + that there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class TestService(object): + """A specification of implemented RPC methods to use in tests.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identifies the RPC service name used during the test. + + Returns: + The RPC service name to be used for the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair + is a UnaryUnaryTestMethod object and the second element is a sequence + of UnaryUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + UnaryStreamTestMethod object and the second element is a sequence of + UnaryStreamTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamUnaryTestMethod object and the second element is a sequence of + StreamUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamStreamTestMethod object and the second element is a sequence of + StreamStreamTestMethodMessages objects. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/stock_service.py b/src/python/_framework/face/testing/stock_service.py new file mode 100644 index 0000000000..bd82877e83 --- /dev/null +++ b/src/python/_framework/face/testing/stock_service.py @@ -0,0 +1,374 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Examples of Python implementations of the stock.proto Stock service.""" + +from _framework.common import cardinality +from _framework.face.testing import service +from _framework.foundation import abandonment +from _framework.foundation import stream +from _framework.foundation import stream_util +from _junkdrawer import stock_pb2 + +SYMBOL_FORMAT = 'test symbol:%03d' +STREAM_LENGTH = 400 + +# A test-appropriate security-pricing function. :-P +_price = lambda symbol_name: float(hash(symbol_name) % 4096) + + +def _get_last_trade_price(stock_request, stock_reply_callback, control, active): + """A unary-request, unary-response test method.""" + control.control() + if active(): + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol))) + else: + raise abandonment.Abandoned() + + +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() + return stream_util.TransformingConsumer( + stock_reply_for_stock_request, stock_reply_consumer) + + +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() + + +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply(symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() + + +class GetLastTradePrice(service.UnaryUnaryTestMethod): + """GetLastTradePrice for use in tests.""" + + def name(self): + return 'GetLastTradePrice' + + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_callback, context, control): + _get_last_trade_price( + request, response_callback, control, context.is_active) + + +class GetLastTradePriceMessages(service.UnaryUnaryTestMessages): + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) + + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) + + +class GetLastTradePriceMultiple(service.StreamStreamTestMethod): + """GetLastTradePriceMultiple for use in tests.""" + + def name(self): + return 'GetLastTradePriceMultiple' + + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple( + response_consumer, control, context.is_active) + + +class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages): + """Pairs of message streams for use with GetLastTradePriceMultiple.""" + + def __init__(self): + self._index = 0 + + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index)) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + + +class WatchFutureTrades(service.UnaryStreamTestMethod): + """WatchFutureTrades for use in tests.""" + + def name(self): + return 'WatchFutureTrades' + + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, context.is_active) + + +class WatchFutureTradesMessages(service.UnaryStreamTestMessages): + """Pairs of a single request message and a sequence of response messages.""" + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=STREAM_LENGTH) + + def verify(self, request, responses, test_case): + test_case.assertEqual(STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) + + +class GetHighestTradePrice(service.StreamUnaryTestMethod): + """GetHighestTradePrice for use in tests.""" + + def name(self): + return 'GetHighestTradePrice' + + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_callback, context, control): + return _get_highest_trade_price( + response_callback, control, context.is_active) + + +class GetHighestTradePriceMessages(service.StreamUnaryTestMessages): + + def requests(self): + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) + + +class StockTestService(service.TestService): + """A corpus of test data with one method of each RPC cardinality.""" + + def name(self): + return 'Stock' + + def unary_unary_scenarios(self): + return { + 'GetLastTradePrice': ( + GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + 'WatchFutureTrades': ( + WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + 'GetHighestTradePrice': ( + GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + 'GetLastTradePriceMultiple': ( + GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), + } + + +STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/_framework/face/testing/test_case.py b/src/python/_framework/face/testing/test_case.py new file mode 100644 index 0000000000..09b5a67f5a --- /dev/null +++ b/src/python/_framework/face/testing/test_case.py @@ -0,0 +1,111 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tools for creating tests of implementations of the Face layer.""" + +import abc + +# face_interfaces and interfaces are referenced in specification in this module. +from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from _framework.face.testing import interfaces # pylint: disable=unused-import + + +class FaceTestCase(object): + """Describes a test of the Face Layer of RPC Framework. + + Concrete subclasses must also inherit from unittest.TestCase and from at least + one class that defines test methods. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_up_implementation( + self, + name, + methods, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods, + multi_method): + """Instantiates the Face Layer implementation under test. + + Args: + name: The service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + inline_value_in_value_out_methods: A dictionary from string method names + to face_interfaces.InlineValueInValueOutMethod implementations of those + methods. + inline_value_in_stream_out_methods: A dictionary from string method names + to face_interfaces.InlineValueInStreamOutMethod implementations of those + methods. + inline_stream_in_value_out_methods: A dictionary from string method names + to face_interfaces.InlineStreamInValueOutMethod implementations of those + methods. + inline_stream_in_stream_out_methods: A dictionary from string method names + to face_interfaces.InlineStreamInStreamOutMethod implementations of + those methods. + event_value_in_value_out_methods: A dictionary from string method names + to face_interfaces.EventValueInValueOutMethod implementations of those + methods. + event_value_in_stream_out_methods: A dictionary from string method names + to face_interfaces.EventValueInStreamOutMethod implementations of those + methods. + event_stream_in_value_out_methods: A dictionary from string method names + to face_interfaces.EventStreamInValueOutMethod implementations of those + methods. + event_stream_in_stream_out_methods: A dictionary from string method names + to face_interfaces.EventStreamInStreamOutMethod implementations of those + methods. + multi_method: An face_interfaces.MultiMethod, or None. + + Returns: + A sequence of length three the first element of which is a + face_interfaces.Server, the second element of which is a + face_interfaces.Stub, (both of which are backed by the given method + implementations), and the third element of which is an arbitrary memo + object to be kept and passed to tearDownImplementation at the conclusion + of the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def tear_down_implementation(self, memo): + """Destroys the Face layer implementation under test. + + Args: + memo: The object from the third position of the return value of + set_up_implementation. + """ + raise NotImplementedError() |