diff options
author | Nicolas Noble <nnoble@google.com> | 2015-01-26 11:41:12 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2015-01-26 11:41:12 -0800 |
commit | fee065c1c7f01207c0e484c92681cea184b1983a (patch) | |
tree | 11baf429fdee72a169bdb0b1d17f16154ab5186a /src/python | |
parent | 614c2bf99b1865761e5a05a56faf1ad8f26138ff (diff) | |
parent | c41704bada0dc8974e7063f84dae934931813811 (diff) |
Merge branch 'master' of github.com:google/grpc into json
Conflicts:
src/core/security/credentials.c
Diffstat (limited to 'src/python')
67 files changed, 9754 insertions, 0 deletions
diff --git a/src/python/__init__.py b/src/python/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/__init__.py diff --git a/src/python/_framework/__init__.py b/src/python/_framework/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/__init__.py diff --git a/src/python/_framework/base/__init__.py b/src/python/_framework/base/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/base/__init__.py diff --git a/src/python/_framework/base/exceptions.py b/src/python/_framework/base/exceptions.py new file mode 100644 index 0000000000..b8f4752184 --- /dev/null +++ b/src/python/_framework/base/exceptions.py @@ -0,0 +1,34 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Exceptions defined and used by the base layer of RPC Framework.""" + + +class NoSuchMethodError(Exception): + """Indicates that an operation with an unrecognized name has been called.""" diff --git a/src/python/_framework/base/interfaces.py b/src/python/_framework/base/interfaces.py new file mode 100644 index 0000000000..de7137cbf7 --- /dev/null +++ b/src/python/_framework/base/interfaces.py @@ -0,0 +1,229 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces defined and used by the base layer of RPC Framework.""" + +# TODO(nathaniel): Use Python's new enum library for enumerated types rather +# than constants merely placed close together. + +import abc + +# stream is referenced from specification in this module. +from _framework.foundation import stream # pylint: disable=unused-import + +# Operation outcomes. +COMPLETED = 'completed' +CANCELLED = 'cancelled' +EXPIRED = 'expired' +RECEPTION_FAILURE = 'reception failure' +TRANSMISSION_FAILURE = 'transmission failure' +SERVICER_FAILURE = 'servicer failure' +SERVICED_FAILURE = 'serviced failure' + +# Subscription categories. +FULL = 'full' +TERMINATION_ONLY = 'termination only' +NONE = 'none' + + +class OperationContext(object): + """Provides operation-related information and action. + + Attributes: + trace_id: A uuid.UUID identifying a particular set of related operations. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Describes whether the operation is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def add_termination_callback(self, callback): + """Adds a function to be called upon operation termination. + + Args: + callback: A callable that will be passed one of COMPLETED, CANCELLED, + EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or + SERVICED_FAILURE. + """ + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the operation. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the operation to complete before it is considered to have + timed out. + """ + raise NotImplementedError() + + @abc.abstractmethod + def fail(self, exception): + """Indicates that the operation has failed. + + Args: + exception: An exception germane to the operation failure. May be None. + """ + raise NotImplementedError() + + +class Servicer(object): + """Interface for service implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, name, context, output_consumer): + """Services an operation. + + Args: + name: The name of the operation. + context: A ServicerContext object affording contextual information and + actions. + output_consumer: A stream.Consumer that will accept output values of + the operation. + + Returns: + A stream.Consumer that will accept input values for the operation. + + Raises: + exceptions.NoSuchMethodError: If this Servicer affords no method with the + given name. + abandonment.Abandoned: If the operation has been aborted and there no + longer is any reason to service the operation. + """ + raise NotImplementedError() + + +class Operation(object): + """Representation of an in-progress operation. + + Attributes: + consumer: A stream.Consumer into which payloads constituting the operation's + input may be passed. + context: An OperationContext affording information and action about the + operation. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cancel(self): + """Cancels this operation.""" + raise NotImplementedError() + + +class ServicedIngestor(object): + """Responsible for accepting the result of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def consumer(self, operation_context): + """Affords a consumer to which operation results will be passed. + + Args: + operation_context: An OperationContext object for the current operation. + + Returns: + A stream.Consumer to which the results of the current operation will be + passed. + + Raises: + abandonment.Abandoned: If the operation has been aborted and there no + longer is any reason to service the operation. + """ + raise NotImplementedError() + + +class ServicedSubscription(object): + """A sum type representing a serviced's interest in an operation. + + Attributes: + category: One of FULL, TERMINATION_ONLY, or NONE. + ingestor: A ServicedIngestor. Must be present if category is FULL. + """ + __metaclass__ = abc.ABCMeta + + +class End(object): + """Common type for entry-point objects on both sides of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def operation_stats(self): + """Reports the number of terminated operations broken down by outcome. + + Returns: + A dictionary from operation outcome constant (COMPLETED, CANCELLED, + EXPIRED, and so on) to an integer representing the number of operations + that terminated with that outcome. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_idle_action(self, action): + """Adds an action to be called when this End has no ongoing operations. + + Args: + action: A callable that accepts no arguments. + """ + raise NotImplementedError() + + +class Front(End): + """Clientish objects that afford the invocation of operations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def operate( + self, name, payload, complete, timeout, subscription, trace_id): + """Commences an operation. + + Args: + name: The name of the method invoked for the operation. + payload: An initial payload for the operation. May be None. + complete: A boolean indicating whether or not additional payloads to be + sent to the servicer may be supplied after this call. + timeout: A length of time in seconds to allow for the operation. + subscription: A ServicedSubscription for the operation. + trace_id: A uuid.UUID identifying a set of related operations to which + this operation belongs. + + Returns: + An Operation object affording information and action about the operation + in progress. + """ + raise NotImplementedError() + + +class Back(End): + """Serverish objects that perform the work of operations.""" + __metaclass__ = abc.ABCMeta diff --git a/src/python/_framework/base/interfaces_test.py b/src/python/_framework/base/interfaces_test.py new file mode 100644 index 0000000000..6eb07ea505 --- /dev/null +++ b/src/python/_framework/base/interfaces_test.py @@ -0,0 +1,299 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Abstract tests against the interfaces of the base layer of RPC Framework.""" + +import threading +import time + +from _framework.base import interfaces +from _framework.base import util +from _framework.foundation import stream +from _framework.foundation import stream_testing +from _framework.foundation import stream_util + +TICK = 0.1 +SMALL_TIMEOUT = TICK * 50 +STREAM_LENGTH = 100 + +SYNCHRONOUS_ECHO = 'synchronous echo' +ASYNCHRONOUS_ECHO = 'asynchronous echo' +IMMEDIATE_FAILURE = 'immediate failure' +TRIGGERED_FAILURE = 'triggered failure' +WAIT_ON_CONDITION = 'wait on condition' + +EMPTY_OUTCOME_DICT = { + interfaces.COMPLETED: 0, + interfaces.CANCELLED: 0, + interfaces.EXPIRED: 0, + interfaces.RECEPTION_FAILURE: 0, + interfaces.TRANSMISSION_FAILURE: 0, + interfaces.SERVICER_FAILURE: 0, + interfaces.SERVICED_FAILURE: 0, + } + + +def _synchronous_echo(output_consumer): + return stream_util.TransformingConsumer(lambda x: x, output_consumer) + + +class AsynchronousEcho(stream.Consumer): + """A stream.Consumer that echoes its input to another stream.Consumer.""" + + def __init__(self, output_consumer, pool): + self._lock = threading.Lock() + self._output_consumer = output_consumer + self._pool = pool + + self._queue = [] + self._spinning = False + + def _spin(self, value, complete): + while True: + if value: + if complete: + self._output_consumer.consume_and_terminate(value) + else: + self._output_consumer.consume(value) + elif complete: + self._output_consumer.terminate() + with self._lock: + if self._queue: + value, complete = self._queue.pop(0) + else: + self._spinning = False + return + + def consume(self, value): + with self._lock: + if self._spinning: + self._queue.append((value, False)) + else: + self._spinning = True + self._pool.submit(self._spin, value, False) + + def terminate(self): + with self._lock: + if self._spinning: + self._queue.append((None, True)) + else: + self._spinning = True + self._pool.submit(self._spin, None, True) + + def consume_and_terminate(self, value): + with self._lock: + if self._spinning: + self._queue.append((value, True)) + else: + self._spinning = True + self._pool.submit(self._spin, value, True) + + +class TestServicer(interfaces.Servicer): + """An interfaces.Servicer with instrumented for testing.""" + + def __init__(self, pool): + self._pool = pool + self.condition = threading.Condition() + self._released = False + + def service(self, name, context, output_consumer): + if name == SYNCHRONOUS_ECHO: + return _synchronous_echo(output_consumer) + elif name == ASYNCHRONOUS_ECHO: + return AsynchronousEcho(output_consumer, self._pool) + elif name == IMMEDIATE_FAILURE: + raise ValueError() + elif name == TRIGGERED_FAILURE: + raise NotImplementedError + elif name == WAIT_ON_CONDITION: + with self.condition: + while not self._released: + self.condition.wait() + return _synchronous_echo(output_consumer) + else: + raise NotImplementedError() + + def release(self): + with self.condition: + self._released = True + self.condition.notify_all() + + +class EasyServicedIngestor(interfaces.ServicedIngestor): + """A trivial implementation of interfaces.ServicedIngestor.""" + + def __init__(self, consumer): + self._consumer = consumer + + def consumer(self, operation_context): + """See interfaces.ServicedIngestor.consumer for specification.""" + return self._consumer + + +class FrontAndBackTest(object): + """A test suite usable against any joined Front and Back.""" + + # Pylint doesn't know that this is a unittest.TestCase mix-in. + # pylint: disable=invalid-name + + def testSimplestCall(self): + """Tests the absolute simplest call - a one-packet fire-and-forget.""" + self.front.operate( + SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT, + util.none_serviced_subscription(), 'test trace ID') + util.wait_for_idle(self.front) + self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) + + # Assuming nothing really pathological (such as pauses on the order of + # SMALL_TIMEOUT interfering with this test) there are a two different ways + # the back could have experienced execution up to this point: + # (1) The packet is still either in the front waiting to be transmitted + # or is somewhere on the link between the front and the back. The back has + # no idea that this test is even happening. Calling wait_for_idle on it + # would do no good because in this case the back is idle and the call would + # return with the packet bound for it still in the front or on the link. + back_operation_stats = self.back.operation_stats() + first_back_possibility = EMPTY_OUTCOME_DICT + # (2) The packet arrived at the back and the back completed the operation. + second_back_possibility = dict(EMPTY_OUTCOME_DICT) + second_back_possibility[interfaces.COMPLETED] = 1 + self.assertIn( + back_operation_stats, (first_back_possibility, second_back_possibility)) + # It's true that if the packet had arrived at the back and the back had + # begun processing that wait_for_idle could hold test execution until the + # back completed the operation, but that doesn't really collapse the + # possibility space down to one solution. + + def testEntireEcho(self): + """Tests a very simple one-packet-each-way round-trip.""" + test_payload = 'test payload' + test_consumer = stream_testing.TestConsumer() + subscription = util.full_serviced_subscription( + EasyServicedIngestor(test_consumer)) + + self.front.operate( + ASYNCHRONOUS_ECHO, test_payload, True, SMALL_TIMEOUT, subscription, + 'test trace ID') + + util.wait_for_idle(self.front) + util.wait_for_idle(self.back) + self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) + self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) + self.assertListEqual([(test_payload, True)], test_consumer.calls) + + def testBidirectionalStreamingEcho(self): + """Tests sending multiple packets each way.""" + test_payload_template = 'test_payload: %03d' + test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)] + test_consumer = stream_testing.TestConsumer() + subscription = util.full_serviced_subscription( + EasyServicedIngestor(test_consumer)) + + operation = self.front.operate( + SYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription, + 'test trace ID') + + for test_payload in test_payloads: + operation.consumer.consume(test_payload) + operation.consumer.terminate() + + util.wait_for_idle(self.front) + util.wait_for_idle(self.back) + self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) + self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) + self.assertListEqual(test_payloads, test_consumer.values()) + + def testCancellation(self): + """Tests cancelling a long-lived operation.""" + test_consumer = stream_testing.TestConsumer() + subscription = util.full_serviced_subscription( + EasyServicedIngestor(test_consumer)) + + operation = self.front.operate( + ASYNCHRONOUS_ECHO, None, False, SMALL_TIMEOUT, subscription, + 'test trace ID') + operation.cancel() + + util.wait_for_idle(self.front) + self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED]) + util.wait_for_idle(self.back) + self.assertListEqual([], test_consumer.calls) + + # Assuming nothing really pathological (such as pauses on the order of + # SMALL_TIMEOUT interfering with this test) there are a two different ways + # the back could have experienced execution up to this point: + # (1) Both packets are still either in the front waiting to be transmitted + # or are somewhere on the link between the front and the back. The back has + # no idea that this test is even happening. Calling wait_for_idle on it + # would do no good because in this case the back is idle and the call would + # return with the packets bound for it still in the front or on the link. + back_operation_stats = self.back.operation_stats() + first_back_possibility = EMPTY_OUTCOME_DICT + # (2) Both packets arrived within SMALL_TIMEOUT of one another at the back. + # The back started processing based on the first packet and then stopped + # upon receiving the cancellation packet. + second_back_possibility = dict(EMPTY_OUTCOME_DICT) + second_back_possibility[interfaces.CANCELLED] = 1 + self.assertIn( + back_operation_stats, (first_back_possibility, second_back_possibility)) + + def testExpiration(self): + """Tests that operations time out.""" + timeout = TICK * 2 + allowance = TICK # How much extra time to + condition = threading.Condition() + test_payload = 'test payload' + subscription = util.termination_only_serviced_subscription() + start_time = time.time() + + outcome_cell = [None] + termination_time_cell = [None] + def termination_action(outcome): + with condition: + outcome_cell[0] = outcome + termination_time_cell[0] = time.time() + condition.notify() + + with condition: + operation = self.front.operate( + SYNCHRONOUS_ECHO, test_payload, False, timeout, subscription, + 'test trace ID') + operation.context.add_termination_callback(termination_action) + while outcome_cell[0] is None: + condition.wait() + + duration = termination_time_cell[0] - start_time + self.assertLessEqual(timeout, duration) + self.assertLess(duration, timeout + allowance) + self.assertEqual(interfaces.EXPIRED, outcome_cell[0]) + util.wait_for_idle(self.front) + self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED]) + util.wait_for_idle(self.back) + self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED]) diff --git a/src/python/_framework/base/packets/__init__.py b/src/python/_framework/base/packets/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/base/packets/__init__.py diff --git a/src/python/_framework/base/packets/_cancellation.py b/src/python/_framework/base/packets/_cancellation.py new file mode 100644 index 0000000000..49172d1b97 --- /dev/null +++ b/src/python/_framework/base/packets/_cancellation.py @@ -0,0 +1,64 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation cancellation.""" + +from _framework.base.packets import _interfaces +from _framework.base.packets import packets + + +class CancellationManager(_interfaces.CancellationManager): + """An implementation of _interfaces.CancellationManager.""" + + def __init__( + self, lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + ingestion_manager: The _interfaces.IngestionManager for the operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + def cancel(self): + """See _interfaces.CancellationManager.cancel for specification.""" + with self._lock: + self._termination_manager.abort(packets.Kind.CANCELLATION) + self._transmission_manager.abort(packets.Kind.CANCELLATION) + self._ingestion_manager.abort() + self._expiration_manager.abort() diff --git a/src/python/_framework/base/packets/_constants.py b/src/python/_framework/base/packets/_constants.py new file mode 100644 index 0000000000..8fbdc82782 --- /dev/null +++ b/src/python/_framework/base/packets/_constants.py @@ -0,0 +1,32 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private constants for the package.""" + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Base) internal error! :-(' diff --git a/src/python/_framework/base/packets/_context.py b/src/python/_framework/base/packets/_context.py new file mode 100644 index 0000000000..be390364b0 --- /dev/null +++ b/src/python/_framework/base/packets/_context.py @@ -0,0 +1,99 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation context.""" + +import time + +# _interfaces and packets are referenced from specification in this module. +from _framework.base import interfaces as base_interfaces +from _framework.base.packets import _interfaces # pylint: disable=unused-import +from _framework.base.packets import packets # pylint: disable=unused-import + + +class OperationContext(base_interfaces.OperationContext): + """An implementation of base_interfaces.OperationContext.""" + + def __init__( + self, lock, operation_id, local_failure, termination_manager, + transmission_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + operation_id: An object identifying the operation. + local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or + packets.Kind.SERVICER_FAILURE describes local failure of customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._local_failure = local_failure + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = None + self._expiration_manager = None + + self.operation_id = operation_id + + def set_ingestion_and_expiration_managers( + self, ingestion_manager, expiration_manager): + """Sets managers with which this OperationContext cooperates. + + Args: + ingestion_manager: The _interfaces.IngestionManager for the operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + def is_active(self): + """See base_interfaces.OperationContext.is_active for specification.""" + with self._lock: + return self._termination_manager.is_active() + + def add_termination_callback(self, callback): + """See base_interfaces.OperationContext.add_termination_callback.""" + with self._lock: + self._termination_manager.add_callback(callback) + + def time_remaining(self): + """See interfaces.OperationContext.time_remaining for specification.""" + with self._lock: + deadline = self._expiration_manager.deadline() + return max(0.0, deadline - time.time()) + + def fail(self, exception): + """See interfaces.OperationContext.fail for specification.""" + with self._lock: + self._termination_manager.abort(self._local_failure) + self._transmission_manager.abort(self._local_failure) + self._ingestion_manager.abort() + self._expiration_manager.abort() diff --git a/src/python/_framework/base/packets/_emission.py b/src/python/_framework/base/packets/_emission.py new file mode 100644 index 0000000000..b4be5eb0ff --- /dev/null +++ b/src/python/_framework/base/packets/_emission.py @@ -0,0 +1,126 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for handling emitted values.""" + +# packets is referenced from specifications in this module. +from _framework.base.packets import _interfaces +from _framework.base.packets import packets # pylint: disable=unused-import + + +class _EmissionManager(_interfaces.EmissionManager): + """An implementation of _interfaces.EmissionManager.""" + + def __init__( + self, lock, failure_kind, termination_manager, transmission_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or + packets.Kind.SERVICER_FAILURE describes this object's methods being + called inappropriately by customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._failure_kind = failure_kind + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = None + self._expiration_manager = None + + self._emission_complete = False + + def set_ingestion_manager_and_expiration_manager( + self, ingestion_manager, expiration_manager): + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + def _abort(self): + self._termination_manager.abort(self._failure_kind) + self._transmission_manager.abort(self._failure_kind) + self._ingestion_manager.abort() + self._expiration_manager.abort() + + def consume(self, value): + with self._lock: + if self._emission_complete: + self._abort() + else: + self._transmission_manager.inmit(value, False) + + def terminate(self): + with self._lock: + if not self._emission_complete: + self._termination_manager.emission_complete() + self._transmission_manager.inmit(None, True) + self._emission_complete = True + + def consume_and_terminate(self, value): + with self._lock: + if self._emission_complete: + self._abort() + else: + self._termination_manager.emission_complete() + self._transmission_manager.inmit(value, True) + self._emission_complete = True + + +def front_emission_manager(lock, termination_manager, transmission_manager): + """Creates an _interfaces.EmissionManager appropriate for front-side use. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the operation. + + Returns: + An _interfaces.EmissionManager appropriate for front-side use. + """ + return _EmissionManager( + lock, packets.Kind.SERVICED_FAILURE, termination_manager, + transmission_manager) + + +def back_emission_manager(lock, termination_manager, transmission_manager): + """Creates an _interfaces.EmissionManager appropriate for back-side use. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the operation. + + Returns: + An _interfaces.EmissionManager appropriate for back-side use. + """ + return _EmissionManager( + lock, packets.Kind.SERVICER_FAILURE, termination_manager, + transmission_manager) diff --git a/src/python/_framework/base/packets/_ends.py b/src/python/_framework/base/packets/_ends.py new file mode 100644 index 0000000000..baaf5cacf9 --- /dev/null +++ b/src/python/_framework/base/packets/_ends.py @@ -0,0 +1,408 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementations of Fronts and Backs.""" + +import collections +import threading +import uuid + +# _interfaces and packets are referenced from specification in this module. +from _framework.base import interfaces as base_interfaces +from _framework.base.packets import _cancellation +from _framework.base.packets import _context +from _framework.base.packets import _emission +from _framework.base.packets import _expiration +from _framework.base.packets import _ingestion +from _framework.base.packets import _interfaces # pylint: disable=unused-import +from _framework.base.packets import _reception +from _framework.base.packets import _termination +from _framework.base.packets import _transmission +from _framework.base.packets import interfaces +from _framework.base.packets import packets # pylint: disable=unused-import +from _framework.foundation import callable_util + +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' + +_OPERATION_OUTCOMES = ( + base_interfaces.COMPLETED, + base_interfaces.CANCELLED, + base_interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE, + base_interfaces.TRANSMISSION_FAILURE, + base_interfaces.SERVICER_FAILURE, + base_interfaces.SERVICED_FAILURE, + ) + + +class _EasyOperation(base_interfaces.Operation): + """A trivial implementation of base_interfaces.Operation.""" + + def __init__(self, emission_manager, context, cancellation_manager): + """Constructor. + + Args: + emission_manager: The _interfaces.EmissionManager for the operation that + will accept values emitted by customer code. + context: The base_interfaces.OperationContext for use by the customer + during the operation. + cancellation_manager: The _interfaces.CancellationManager for the + operation. + """ + self.consumer = emission_manager + self.context = context + self._cancellation_manager = cancellation_manager + + def cancel(self): + self._cancellation_manager.cancel() + + +class _Endlette(object): + """Utility for stateful behavior common to Fronts and Backs.""" + + def __init__(self, pool): + """Constructor. + + Args: + pool: A thread pool to use when calling registered idle actions. + """ + self._lock = threading.Lock() + self._pool = pool + # Dictionary from operation IDs to ReceptionManager-or-None. A None value + # indicates an in-progress fire-and-forget operation for which the customer + # has chosen to ignore results. + self._operations = {} + self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES} + self._idle_actions = [] + + def terminal_action(self, operation_id): + """Constructs the termination action for a single operation. + + Args: + operation_id: An operation ID. + + Returns: + A callable that takes an operation outcome for an argument to be used as + the termination action for the operation associated with the given + operation ID. + """ + def termination_action(outcome): + with self._lock: + self._stats[outcome] += 1 + self._operations.pop(operation_id, None) + if not self._operations: + for action in self._idle_actions: + self._pool.submit(callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) + self._idle_actions = [] + return termination_action + + def __enter__(self): + self._lock.acquire() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._lock.release() + + def get_operation(self, operation_id): + return self._operations.get(operation_id, None) + + def add_operation(self, operation_id, operation_reception_manager): + self._operations[operation_id] = operation_reception_manager + + def operation_stats(self): + with self._lock: + return dict(self._stats) + + def add_idle_action(self, action): + with self._lock: + if self._operations: + self._idle_actions.append(action) + else: + self._pool.submit(callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) + + +class _FrontManagement( + collections.namedtuple( + '_FrontManagement', + ('reception', 'emission', 'operation', 'cancellation'))): + """Just a trivial helper class to bundle four fellow-traveling objects.""" + + +def _front_operate( + callback, work_pool, transmission_pool, utility_pool, + termination_action, operation_id, name, payload, complete, timeout, + subscription, trace_id): + """Constructs objects necessary for front-side operation management. + + Args: + callback: A callable that accepts packets.FrontToBackPackets and delivers + them to the other side of the operation. Execution of this callable may + take any arbitrary length of time. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + termination_action: A no-arg behavior to be called upon operation + completion. + operation_id: An object identifying the operation. + name: The name of the method being called during the operation. + payload: The first customer-significant value to be transmitted to the other + side. May be None if there is no such value or if the customer chose not + to pass it at operation invocation. + complete: A boolean indicating whether or not additional payloads will be + supplied by the customer. + timeout: A length of time in seconds to allow for the operation. + subscription: A base_interfaces.ServicedSubscription describing the + customer's interest in the results of the operation. + trace_id: A uuid.UUID identifying a set of related operations to which this + operation belongs. May be None. + + Returns: + A _FrontManagement object bundling together the + _interfaces.ReceptionManager, _interfaces.EmissionManager, + _context.OperationContext, and _interfaces.CancellationManager for the + operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.front_termination_manager( + work_pool, utility_pool, termination_action, subscription.category) + transmission_manager = _transmission.front_transmission_manager( + lock, transmission_pool, callback, operation_id, name, + subscription.category, trace_id, timeout, termination_manager) + operation_context = _context.OperationContext( + lock, operation_id, packets.Kind.SERVICED_FAILURE, + termination_manager, transmission_manager) + emission_manager = _emission.front_emission_manager( + lock, termination_manager, transmission_manager) + ingestion_manager = _ingestion.front_ingestion_manager( + lock, work_pool, subscription, termination_manager, + transmission_manager, operation_context) + expiration_manager = _expiration.front_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + timeout) + reception_manager = _reception.front_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + cancellation_manager = _cancellation.CancellationManager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + + transmission_manager.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + operation_context.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + emission_manager.set_ingestion_manager_and_expiration_manager( + ingestion_manager, expiration_manager) + ingestion_manager.set_expiration_manager(expiration_manager) + + transmission_manager.inmit(payload, complete) + + returned_reception_manager = ( + None if subscription.category == base_interfaces.NONE + else reception_manager) + + return _FrontManagement( + returned_reception_manager, emission_manager, operation_context, + cancellation_manager) + + +class Front(interfaces.Front): + """An implementation of interfaces.Front.""" + + def __init__(self, work_pool, transmission_pool, utility_pool): + """Constructor. + + Args: + work_pool: A thread pool to be used for executing customer code. + transmission_pool: A thread pool to be used for transmitting values to + the other side of the operation. + utility_pool: A thread pool to be used for utility tasks. + """ + self._endlette = _Endlette(utility_pool) + self._work_pool = work_pool + self._transmission_pool = transmission_pool + self._utility_pool = utility_pool + self._callback = None + + self._operations = {} + + def join_rear_link(self, rear_link): + """See interfaces.ForeLink.join_rear_link for specification.""" + with self._endlette: + self._callback = rear_link.accept_front_to_back_ticket + + def operation_stats(self): + """See base_interfaces.End.operation_stats for specification.""" + return self._endlette.operation_stats() + + def add_idle_action(self, action): + """See base_interfaces.End.add_idle_action for specification.""" + self._endlette.add_idle_action(action) + + def operate( + self, name, payload, complete, timeout, subscription, trace_id): + """See base_interfaces.Front.operate for specification.""" + operation_id = uuid.uuid4() + with self._endlette: + management = _front_operate( + self._callback, self._work_pool, self._transmission_pool, + self._utility_pool, self._endlette.terminal_action(operation_id), + operation_id, name, payload, complete, timeout, subscription, + trace_id) + self._endlette.add_operation(operation_id, management.reception) + return _EasyOperation( + management.emission, management.operation, management.cancellation) + + def accept_back_to_front_ticket(self, ticket): + """See interfaces.End.act for specification.""" + with self._endlette: + reception_manager = self._endlette.get_operation(ticket.operation_id) + if reception_manager: + reception_manager.receive_packet(ticket) + + +def _back_operate( + servicer, callback, work_pool, transmission_pool, utility_pool, + termination_action, ticket, default_timeout, maximum_timeout): + """Constructs objects necessary for back-side operation management. + + Also begins back-side operation by feeding the first received ticket into the + constructed _interfaces.ReceptionManager. + + Args: + servicer: An interfaces.Servicer for servicing operations. + callback: A callable that accepts packets.BackToFrontPackets and delivers + them to the other side of the operation. Execution of this callable may + take any arbitrary length of time. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + termination_action: A no-arg behavior to be called upon operation + completion. + ticket: The first packets.FrontToBackPacket received for the operation. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + The _interfaces.ReceptionManager to be used for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.back_termination_manager( + work_pool, utility_pool, termination_action, ticket.subscription) + transmission_manager = _transmission.back_transmission_manager( + lock, transmission_pool, callback, ticket.operation_id, + termination_manager, ticket.subscription) + operation_context = _context.OperationContext( + lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE, + termination_manager, transmission_manager) + emission_manager = _emission.back_emission_manager( + lock, termination_manager, transmission_manager) + ingestion_manager = _ingestion.back_ingestion_manager( + lock, work_pool, servicer, termination_manager, + transmission_manager, operation_context, emission_manager) + expiration_manager = _expiration.back_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + ticket.timeout, default_timeout, maximum_timeout) + reception_manager = _reception.back_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + + transmission_manager.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + operation_context.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + emission_manager.set_ingestion_manager_and_expiration_manager( + ingestion_manager, expiration_manager) + ingestion_manager.set_expiration_manager(expiration_manager) + + reception_manager.receive_packet(ticket) + + return reception_manager + + +class Back(interfaces.Back): + """An implementation of interfaces.Back.""" + + def __init__( + self, servicer, work_pool, transmission_pool, utility_pool, + default_timeout, maximum_timeout): + """Constructor. + + Args: + servicer: An interfaces.Servicer for servicing operations. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + """ + self._endlette = _Endlette(utility_pool) + self._servicer = servicer + self._work_pool = work_pool + self._transmission_pool = transmission_pool + self._utility_pool = utility_pool + self._default_timeout = default_timeout + self._maximum_timeout = maximum_timeout + self._callback = None + + def join_fore_link(self, fore_link): + """See interfaces.RearLink.join_fore_link for specification.""" + with self._endlette: + self._callback = fore_link.accept_back_to_front_ticket + + def accept_front_to_back_ticket(self, ticket): + """See interfaces.RearLink.accept_front_to_back_ticket for specification.""" + with self._endlette: + reception_manager = self._endlette.get_operation(ticket.operation_id) + if reception_manager is None: + reception_manager = _back_operate( + self._servicer, self._callback, self._work_pool, + self._transmission_pool, self._utility_pool, + self._endlette.terminal_action(ticket.operation_id), ticket, + self._default_timeout, self._maximum_timeout) + self._endlette.add_operation(ticket.operation_id, reception_manager) + else: + reception_manager.receive_packet(ticket) + + def operation_stats(self): + """See base_interfaces.End.operation_stats for specification.""" + return self._endlette.operation_stats() + + def add_idle_action(self, action): + """See base_interfaces.End.add_idle_action for specification.""" + self._endlette.add_idle_action(action) diff --git a/src/python/_framework/base/packets/_expiration.py b/src/python/_framework/base/packets/_expiration.py new file mode 100644 index 0000000000..772e15f08c --- /dev/null +++ b/src/python/_framework/base/packets/_expiration.py @@ -0,0 +1,158 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation expiration.""" + +import time + +from _framework.base.packets import _interfaces +from _framework.base.packets import packets +from _framework.foundation import later + + +class _ExpirationManager(_interfaces.ExpirationManager): + """An implementation of _interfaces.ExpirationManager.""" + + def __init__( + self, lock, termination_manager, transmission_manager, ingestion_manager, + commencement, timeout, maximum_timeout): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + ingestion_manager: The _interfaces.IngestionManager for the operation. + commencement: The time in seconds since the epoch at which the operation + began. + timeout: A length of time in seconds to allow for the operation to run. + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run despite what is requested via this object's + change_timout method. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = ingestion_manager + self._commencement = commencement + self._maximum_timeout = maximum_timeout + + self._timeout = timeout + self._deadline = commencement + timeout + self._index = None + self._future = None + + def _expire(self, index): + with self._lock: + if self._future is not None and index == self._index: + self._future = None + self._termination_manager.abort(packets.Kind.EXPIRATION) + self._transmission_manager.abort(packets.Kind.EXPIRATION) + self._ingestion_manager.abort() + + def start(self): + self._index = 0 + self._future = later.later(self._timeout, lambda: self._expire(0)) + + def change_timeout(self, timeout): + if self._future is not None and timeout != self._timeout: + self._future.cancel() + new_timeout = min(timeout, self._maximum_timeout) + new_index = self._index + 1 + self._timeout = new_timeout + self._deadline = self._commencement + new_timeout + self._index = new_index + delay = self._deadline - time.time() + self._future = later.later( + delay, lambda: self._expire(new_index)) + + def deadline(self): + return self._deadline + + def abort(self): + if self._future: + self._future.cancel() + self._future = None + self._deadline_index = None + + +def front_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + timeout): + """Creates an _interfaces.ExpirationManager appropriate for front-side use. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + ingestion_manager: The _interfaces.IngestionManager for the operation. + timeout: A length of time in seconds to allow for the operation to run. + + Returns: + An _interfaces.ExpirationManager appropriate for front-side use. + """ + commencement = time.time() + expiration_manager = _ExpirationManager( + lock, termination_manager, transmission_manager, ingestion_manager, + commencement, timeout, timeout) + expiration_manager.start() + return expiration_manager + + +def back_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + timeout, default_timeout, maximum_timeout): + """Creates an _interfaces.ExpirationManager appropriate for back-side use. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + ingestion_manager: The _interfaces.IngestionManager for the operation. + timeout: A length of time in seconds to allow for the operation to run. May + be None in which case default_timeout will be used. + default_timeout: The default length of time in seconds to allow for the + operation to run if the front-side customer has not specified such a value + (or if the value they specified is not yet known). + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run. + + Returns: + An _interfaces.ExpirationManager appropriate for back-side use. + """ + commencement = time.time() + expiration_manager = _ExpirationManager( + lock, termination_manager, transmission_manager, ingestion_manager, + commencement, default_timeout if timeout is None else timeout, + maximum_timeout) + expiration_manager.start() + return expiration_manager diff --git a/src/python/_framework/base/packets/_ingestion.py b/src/python/_framework/base/packets/_ingestion.py new file mode 100644 index 0000000000..ad5ed4cada --- /dev/null +++ b/src/python/_framework/base/packets/_ingestion.py @@ -0,0 +1,440 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ingestion during an operation.""" + +import abc +import collections + +from _framework.base import exceptions +from _framework.base import interfaces +from _framework.base.packets import _constants +from _framework.base.packets import _interfaces +from _framework.base.packets import packets +from _framework.foundation import abandonment +from _framework.foundation import callable_util +from _framework.foundation import stream + +_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' +_CONSUME_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' + + +class _ConsumerCreation(collections.namedtuple( + '_ConsumerCreation', ('consumer', 'remote_error', 'abandoned'))): + """A sum type for the outcome of ingestion initialization. + + Either consumer will be non-None, remote_error will be True, or abandoned will + be True. + + Attributes: + consumer: A stream.Consumer for ingesting payloads. + remote_error: A boolean indicating that the consumer could not be created + due to an error on the remote side of the operation. + abandoned: A boolean indicating that the consumer creation was abandoned. + """ + + +class _EmptyConsumer(stream.Consumer): + """A no-operative stream.Consumer that ignores all inputs and calls.""" + + def consume(self, value): + """See stream.Consumer.consume for specification.""" + + def terminate(self): + """See stream.Consumer.terminate for specification.""" + + def consume_and_terminate(self, value): + """See stream.Consumer.consume_and_terminate for specification.""" + + +class _ConsumerCreator(object): + """Common specification of different consumer-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create_consumer(self, requirement): + """Creates the stream.Consumer to which customer payloads will be delivered. + + Any exceptions raised by this method should be attributed to and treated as + defects in the serviced or servicer code called by this method. + + Args: + requirement: A value required by this _ConsumerCreator for consumer + creation. + + Returns: + A _ConsumerCreation describing the result of consumer creation. + """ + raise NotImplementedError() + + +class _FrontConsumerCreator(_ConsumerCreator): + """A _ConsumerCreator appropriate for front-side use.""" + + def __init__(self, subscription, operation_context): + """Constructor. + + Args: + subscription: The serviced's interfaces.ServicedSubscription for the + operation. + operation_context: The interfaces.OperationContext object for the + operation. + """ + self._subscription = subscription + self._operation_context = operation_context + + def create_consumer(self, requirement): + """See _ConsumerCreator.create_consumer for specification.""" + if self._subscription.category == interfaces.FULL: + try: + return _ConsumerCreation( + self._subscription.ingestor.consumer(self._operation_context), + False, False) + except abandonment.Abandoned: + return _ConsumerCreation(None, False, True) + else: + return _ConsumerCreation(_EmptyConsumer(), False, False) + + +class _BackConsumerCreator(_ConsumerCreator): + """A _ConsumerCreator appropriate for back-side use.""" + + def __init__(self, servicer, operation_context, emission_consumer): + """Constructor. + + Args: + servicer: The interfaces.Servicer that will service the operation. + operation_context: The interfaces.OperationContext object for the + operation. + emission_consumer: The stream.Consumer object to which payloads emitted + from the operation will be passed. + """ + self._servicer = servicer + self._operation_context = operation_context + self._emission_consumer = emission_consumer + + def create_consumer(self, requirement): + """See _ConsumerCreator.create_consumer for full specification. + + Args: + requirement: The name of the Servicer method to be called during this + operation. + + Returns: + A _ConsumerCreation describing the result of consumer creation. + """ + try: + return _ConsumerCreation( + self._servicer.service( + requirement, self._operation_context, self._emission_consumer), + False, False) + except exceptions.NoSuchMethodError: + return _ConsumerCreation(None, True, False) + except abandonment.Abandoned: + return _ConsumerCreation(None, False, True) + + +class _WrappedConsumer(object): + """Wraps a consumer to catch the exceptions that it is allowed to throw.""" + + def __init__(self, consumer): + """Constructor. + + Args: + consumer: A stream.Consumer that may raise abandonment.Abandoned from any + of its methods. + """ + self._consumer = consumer + + def moar(self, payload, complete): + """Makes progress with the wrapped consumer. + + This method catches all exceptions allowed to be thrown by the wrapped + consumer. Any exceptions raised by this method should be blamed on the + customer-supplied consumer. + + Args: + payload: A customer-significant payload object. May be None only if + complete is True. + complete: Whether or not the end of the payload sequence has been reached. + May be False only if payload is not None. + + Returns: + True if the wrapped consumer made progress or False if the wrapped + consumer raised abandonment.Abandoned to indicate its abandonment of + progress. + """ + try: + if payload: + if complete: + self._consumer.consume_and_terminate(payload) + else: + self._consumer.consume(payload) + else: + self._consumer.terminate() + return True + except abandonment.Abandoned: + return False + + +class _IngestionManager(_interfaces.IngestionManager): + """An implementation of _interfaces.IngestionManager.""" + + def __init__( + self, lock, pool, consumer_creator, failure_kind, termination_manager, + transmission_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + consumer_creator: A _ConsumerCreator wrapping the portion of customer code + that when called returns the stream.Consumer with which the customer + code will ingest payload values. + failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or + packets.Kind.SERVICER_FAILURE describes local failure of customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._pool = pool + self._consumer_creator = consumer_creator + self._failure_kind = failure_kind + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = None + + self._wrapped_ingestion_consumer = None + self._pending_ingestion = [] + self._ingestion_complete = False + self._processing = False + + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + + def _abort_internal_only(self): + self._wrapped_ingestion_consumer = None + self._pending_ingestion = None + + def _abort_and_notify(self, outcome): + self._abort_internal_only() + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.abort() + + def _next(self): + """Computes the next step for ingestion. + + Returns: + A payload, complete, continue triplet indicating what payload (if any) is + available to feed into customer code, whether or not the sequence of + payloads has terminated, and whether or not there is anything + immediately actionable to call customer code to do. + """ + if self._pending_ingestion is None: + return None, False, False + elif self._pending_ingestion: + payload = self._pending_ingestion.pop(0) + complete = self._ingestion_complete and not self._pending_ingestion + return payload, complete, True + elif self._ingestion_complete: + return None, True, True + else: + return None, False, False + + def _process(self, wrapped_ingestion_consumer, payload, complete): + """A method to call to execute customer code. + + This object's lock must *not* be held when calling this method. + + Args: + wrapped_ingestion_consumer: The _WrappedConsumer with which to pass + payloads to customer code. + payload: A customer payload. May be None only if complete is True. + complete: Whether or not the sequence of payloads to pass to the customer + has concluded. + """ + while True: + consumption_outcome = callable_util.call_logging_exceptions( + wrapped_ingestion_consumer.moar, _CONSUME_EXCEPTION_LOG_MESSAGE, + payload, complete) + if consumption_outcome.exception is None: + if consumption_outcome.return_value: + with self._lock: + if complete: + self._pending_ingestion = None + self._termination_manager.ingestion_complete() + return + else: + payload, complete, moar = self._next() + if not moar: + self._processing = False + return + else: + with self._lock: + if self._pending_ingestion is not None: + self._abort_and_notify(self._failure_kind) + self._processing = False + return + else: + with self._lock: + self._abort_and_notify(self._failure_kind) + self._processing = False + return + + def start(self, requirement): + if self._pending_ingestion is not None: + def initialize(): + consumer_creation_outcome = callable_util.call_logging_exceptions( + self._consumer_creator.create_consumer, + _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement) + if consumer_creation_outcome.return_value is None: + with self._lock: + self._abort_and_notify(self._failure_kind) + self._processing = False + elif consumer_creation_outcome.return_value.remote_error: + with self._lock: + self._abort_and_notify(packets.Kind.RECEPTION_FAILURE) + self._processing = False + elif consumer_creation_outcome.return_value.abandoned: + with self._lock: + if self._pending_ingestion is not None: + self._abort_and_notify(self._failure_kind) + self._processing = False + else: + wrapped_ingestion_consumer = _WrappedConsumer( + consumer_creation_outcome.return_value.consumer) + with self._lock: + self._wrapped_ingestion_consumer = wrapped_ingestion_consumer + payload, complete, moar = self._next() + if not moar: + self._processing = False + return + + self._process(wrapped_ingestion_consumer, payload, complete) + + self._pool.submit( + callable_util.with_exceptions_logged( + initialize, _constants.INTERNAL_ERROR_LOG_MESSAGE)) + self._processing = True + + def consume(self, payload): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + elif self._pending_ingestion is not None: + if self._processing: + self._pending_ingestion.append(payload) + else: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, payload, False) + self._processing = True + + def terminate(self): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + else: + self._ingestion_complete = True + if self._pending_ingestion is not None and not self._processing: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, None, True) + self._processing = True + + def consume_and_terminate(self, payload): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + else: + self._ingestion_complete = True + if self._pending_ingestion is not None: + if self._processing: + self._pending_ingestion.append(payload) + else: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, payload, True) + self._processing = True + + def abort(self): + """See _interfaces.IngestionManager.abort for specification.""" + self._abort_internal_only() + + +def front_ingestion_manager( + lock, pool, subscription, termination_manager, transmission_manager, + operation_context): + """Creates an IngestionManager appropriate for front-side use. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + subscription: A base_interfaces.ServicedSubscription indicating the + customer's interest in the results of the operation. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + operation_context: A base_interfaces.OperationContext for the operation. + + Returns: + An IngestionManager appropriate for front-side use. + """ + ingestion_manager = _IngestionManager( + lock, pool, _FrontConsumerCreator(subscription, operation_context), + packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager) + ingestion_manager.start(None) + return ingestion_manager + + +def back_ingestion_manager( + lock, pool, servicer, termination_manager, transmission_manager, + operation_context, emission_consumer): + """Creates an IngestionManager appropriate for back-side use. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + servicer: A base_interfaces.Servicer for servicing the operation. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + operation_context: A base_interfaces.OperationContext for the operation. + emission_consumer: The _interfaces.EmissionConsumer for the operation. + + Returns: + An IngestionManager appropriate for back-side use. + """ + ingestion_manager = _IngestionManager( + lock, pool, _BackConsumerCreator( + servicer, operation_context, emission_consumer), + packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager) + return ingestion_manager diff --git a/src/python/_framework/base/packets/_interfaces.py b/src/python/_framework/base/packets/_interfaces.py new file mode 100644 index 0000000000..5f6c0593d0 --- /dev/null +++ b/src/python/_framework/base/packets/_interfaces.py @@ -0,0 +1,269 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal interfaces.""" + +import abc + +# base_interfaces and packets are referenced from specification in this module. +from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import +from _framework.base.packets import packets # pylint: disable=unused-import +from _framework.foundation import stream + + +class TerminationManager(object): + """An object responsible for handling the termination of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Reports whether or not the operation is active. + + Returns: + True if the operation is active or False if the operation has terminated. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_callback(self, callback): + """Registers a callback to be called on operation termination. + + If the operation has already terminated, the callback will be called + immediately. + + Args: + callback: A callable that will be passed one of base_interfaces.COMPLETED, + base_interfaces.CANCELLED, base_interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE, + base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE. + """ + raise NotImplementedError() + + @abc.abstractmethod + def emission_complete(self): + """Indicates that emissions from customer code have completed.""" + raise NotImplementedError() + + @abc.abstractmethod + def transmission_complete(self): + """Indicates that transmissions to the remote end are complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def ingestion_complete(self): + """Indicates that customer code ingestion of received values is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, kind): + """Indicates that the operation must abort for the indicated reason. + + Args: + kind: A value of packets.Kind indicating operation abortion. + """ + raise NotImplementedError() + + +class TransmissionManager(object): + """A manager responsible for transmitting to the other end of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def inmit(self, emission, complete): + """Accepts a value for transmission to the other end of the operation. + + Args: + emission: A value of some significance to the customer to be transmitted + to the other end of the operation. May be None only if complete is True. + complete: A boolean that if True indicates that customer code has emitted + all values it intends to emit. + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, kind): + """Indicates that the operation has aborted for the indicated reason. + + Args: + kind: A value of packets.Kind indicating operation abortion. + """ + raise NotImplementedError() + + +class EmissionManager(stream.Consumer): + """A manager of values emitted by customer code.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_ingestion_manager_and_expiration_manager( + self, ingestion_manager, expiration_manager): + """Sets two other objects with which this EmissionManager will cooperate. + + Args: + ingestion_manager: The IngestionManager for the operation. + expiration_manager: The ExpirationManager for the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def consume(self, value): + """Accepts a value emitted by customer code. + + This method should only be called by customer code. + + Args: + value: Any value of significance to the customer. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates that no more values will be emitted by customer code. + + This method should only be called by customer code. + + Implementations of this method may be idempotent and forgive customer code + calling this method more than once. + """ + raise NotImplementedError() + + @abc.abstractmethod + def consume_and_terminate(self, value): + """Accepts the last value emitted by customer code. + + This method should only be called by customer code. + + Args: + value: Any value of significance to the customer. + """ + raise NotImplementedError() + + +class IngestionManager(stream.Consumer): + """A manager responsible for executing customer code.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_expiration_manager(self, expiration_manager): + """Sets the ExpirationManager with which this object will cooperate.""" + + @abc.abstractmethod + def start(self, requirement): + """Commences execution of customer code. + + Args: + requirement: Some value unavailable at the time of this object's + construction that is required to begin executing customer code. + """ + raise NotImplementedError() + + @abc.abstractmethod + def consume(self, payload): + """Accepts a customer-significant value to be supplied to customer code. + + Args: + payload: Some customer-significant value. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates the end of values to be supplied to customer code.""" + raise NotImplementedError() + + @abc.abstractmethod + def consume_and_terminate(self, payload): + """Accepts the last value to be supplied to customer code. + + Args: + payload: Some customer-significant value (and the last such value). + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self): + """Indicates to this manager that the operation has aborted.""" + raise NotImplementedError() + + +class ExpirationManager(object): + """A manager responsible for aborting the operation if it runs out of time.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def change_timeout(self, timeout): + """Changes the timeout allotted for the operation. + + Operation duration is always measure from the beginning of the operation; + calling this method changes the operation's allotted time to timeout total + seconds, not timeout seconds from the time of this method call. + + Args: + timeout: A length of time in seconds to allow for the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deadline(self): + """Returns the time until which the operation is allowed to run. + + Returns: + The time (seconds since the epoch) at which the operation will expire. + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self): + """Indicates to this manager that the operation has aborted.""" + raise NotImplementedError() + + +class ReceptionManager(object): + """A manager responsible for receiving packets from the other end.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def receive_packet(self, packet): + """Handle a packet from the other side of the operation. + + Args: + packet: A packets.BackToFrontPacket or packets.FrontToBackPacket + appropriate to this end of the operation and this object. + """ + raise NotImplementedError() + + +class CancellationManager(object): + """A manager of operation cancellation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cancel(self): + """Cancels the operation.""" + raise NotImplementedError() diff --git a/src/python/_framework/base/packets/_reception.py b/src/python/_framework/base/packets/_reception.py new file mode 100644 index 0000000000..a2a3823d28 --- /dev/null +++ b/src/python/_framework/base/packets/_reception.py @@ -0,0 +1,394 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for packet reception.""" + +import abc + +from _framework.base.packets import _interfaces +from _framework.base.packets import packets + + +class _Receiver(object): + """Common specification of different packet-handling behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def abort_if_abortive(self, packet): + """Aborts the operation if the packet is abortive. + + Args: + packet: A just-arrived packet. + + Returns: + A boolean indicating whether or not this Receiver aborted the operation + based on the packet. + """ + raise NotImplementedError() + + @abc.abstractmethod + def receive(self, packet): + """Handles a just-arrived packet. + + Args: + packet: A just-arrived packet. + + Returns: + A boolean indicating whether or not the packet was terminal (i.e. whether + or not non-abortive packets are legal after this one). + """ + raise NotImplementedError() + + @abc.abstractmethod + def reception_failure(self): + """Aborts the operation with an indication of reception failure.""" + raise NotImplementedError() + + +def _abort( + category, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Indicates abortion with the given category to the given managers.""" + termination_manager.abort(category) + transmission_manager.abort(category) + ingestion_manager.abort() + expiration_manager.abort() + + +def _abort_if_abortive( + packet, abortive, termination_manager, transmission_manager, + ingestion_manager, expiration_manager): + """Determines a packet's being abortive and if so aborts the operation. + + Args: + packet: A just-arrived packet. + abortive: A callable that takes a packet and returns an operation category + indicating that the operation should be aborted or None indicating that + the operation should not be aborted. + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + + Returns: + True if the operation was aborted; False otherwise. + """ + abort_category = abortive(packet) + if abort_category is None: + return False + else: + _abort( + abort_category, termination_manager, transmission_manager, + ingestion_manager, expiration_manager) + return True + + +def _reception_failure( + termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Aborts the operation with an indication of reception failure.""" + _abort( + packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager, + ingestion_manager, expiration_manager) + + +class _BackReceiver(_Receiver): + """Packet-handling specific to the back side of an operation.""" + + def __init__( + self, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Constructor. + + Args: + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + """ + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + self._first_packet_seen = False + self._last_packet_seen = False + + def _abortive(self, packet): + """Determines whether or not (and if so, how) a packet is abortive. + + Args: + packet: A just-arrived packet. + + Returns: + One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or + packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive + and how, or None, indicating that the packet is not abortive. + """ + if packet.kind is packets.Kind.CANCELLATION: + return packets.Kind.CANCELLATION + elif packet.kind is packets.Kind.EXPIRATION: + return packets.Kind.EXPIRATION + elif packet.kind is packets.Kind.SERVICED_FAILURE: + return packets.Kind.SERVICED_FAILURE + elif packet.kind is packets.Kind.RECEPTION_FAILURE: + return packets.Kind.SERVICED_FAILURE + elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and + self._first_packet_seen): + return packets.Kind.RECEPTION_FAILURE + elif self._last_packet_seen: + return packets.Kind.RECEPTION_FAILURE + else: + return None + + def abort_if_abortive(self, packet): + """See _Receiver.abort_if_abortive for specification.""" + return _abort_if_abortive( + packet, self._abortive, self._termination_manager, + self._transmission_manager, self._ingestion_manager, + self._expiration_manager) + + def receive(self, packet): + """See _Receiver.receive for specification.""" + if packet.timeout is not None: + self._expiration_manager.change_timeout(packet.timeout) + + if packet.kind is packets.Kind.COMMENCEMENT: + self._first_packet_seen = True + self._ingestion_manager.start(packet.name) + if packet.payload is not None: + self._ingestion_manager.consume(packet.payload) + elif packet.kind is packets.Kind.CONTINUATION: + self._ingestion_manager.consume(packet.payload) + elif packet.kind is packets.Kind.COMPLETION: + self._last_packet_seen = True + if packet.payload is None: + self._ingestion_manager.terminate() + else: + self._ingestion_manager.consume_and_terminate(packet.payload) + else: + self._first_packet_seen = True + self._last_packet_seen = True + self._ingestion_manager.start(packet.name) + if packet.payload is None: + self._ingestion_manager.terminate() + else: + self._ingestion_manager.consume_and_terminate(packet.payload) + + def reception_failure(self): + """See _Receiver.reception_failure for specification.""" + _reception_failure( + self._termination_manager, self._transmission_manager, + self._ingestion_manager, self._expiration_manager) + + +class _FrontReceiver(_Receiver): + """Packet-handling specific to the front side of an operation.""" + + def __init__( + self, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Constructor. + + Args: + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + """ + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + self._last_packet_seen = False + + def _abortive(self, packet): + """Determines whether or not (and if so, how) a packet is abortive. + + Args: + packet: A just-arrived packet. + + Returns: + One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or + packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive + and how, or None, indicating that the packet is not abortive. + """ + if packet.kind is packets.Kind.EXPIRATION: + return packets.Kind.EXPIRATION + elif packet.kind is packets.Kind.SERVICER_FAILURE: + return packets.Kind.SERVICER_FAILURE + elif packet.kind is packets.Kind.RECEPTION_FAILURE: + return packets.Kind.SERVICER_FAILURE + elif self._last_packet_seen: + return packets.Kind.RECEPTION_FAILURE + else: + return None + + def abort_if_abortive(self, packet): + """See _Receiver.abort_if_abortive for specification.""" + return _abort_if_abortive( + packet, self._abortive, self._termination_manager, + self._transmission_manager, self._ingestion_manager, + self._expiration_manager) + + def receive(self, packet): + """See _Receiver.receive for specification.""" + if packet.kind is packets.Kind.CONTINUATION: + self._ingestion_manager.consume(packet.payload) + elif packet.kind is packets.Kind.COMPLETION: + self._last_packet_seen = True + if packet.payload is None: + self._ingestion_manager.terminate() + else: + self._ingestion_manager.consume_and_terminate(packet.payload) + + def reception_failure(self): + """See _Receiver.reception_failure for specification.""" + _reception_failure( + self._termination_manager, self._transmission_manager, + self._ingestion_manager, self._expiration_manager) + + +class _ReceptionManager(_interfaces.ReceptionManager): + """A ReceptionManager based around a _Receiver passed to it.""" + + def __init__(self, lock, receiver): + """Constructor. + + Args: + lock: The operation-servicing-wide lock object. + receiver: A _Receiver responsible for handling received packets. + """ + self._lock = lock + self._receiver = receiver + + self._lowest_unseen_sequence_number = 0 + self._out_of_sequence_packets = {} + self._completed_sequence_number = None + self._aborted = False + + def _sequence_failure(self, packet): + """Determines a just-arrived packet's sequential legitimacy. + + Args: + packet: A just-arrived packet. + + Returns: + True if the packet is sequentially legitimate; False otherwise. + """ + if packet.sequence_number < self._lowest_unseen_sequence_number: + return True + elif packet.sequence_number in self._out_of_sequence_packets: + return True + elif (self._completed_sequence_number is not None and + self._completed_sequence_number <= packet.sequence_number): + return True + else: + return False + + def _process(self, packet): + """Process those packets ready to be processed. + + Args: + packet: A just-arrived packet the sequence number of which matches this + _ReceptionManager's _lowest_unseen_sequence_number field. + """ + while True: + completed = self._receiver.receive(packet) + if completed: + self._out_of_sequence_packets.clear() + self._completed_sequence_number = packet.sequence_number + self._lowest_unseen_sequence_number = packet.sequence_number + 1 + return + else: + next_packet = self._out_of_sequence_packets.pop( + packet.sequence_number + 1, None) + if next_packet is None: + self._lowest_unseen_sequence_number = packet.sequence_number + 1 + return + else: + packet = next_packet + + def receive_packet(self, packet): + """See _interfaces.ReceptionManager.receive_packet for specification.""" + with self._lock: + if self._aborted: + return + elif self._sequence_failure(packet): + self._receiver.reception_failure() + self._aborted = True + elif self._receiver.abort_if_abortive(packet): + self._aborted = True + elif packet.sequence_number == self._lowest_unseen_sequence_number: + self._process(packet) + else: + self._out_of_sequence_packets[packet.sequence_number] = packet + + +def front_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Creates a _interfaces.ReceptionManager for front-side use. + + Args: + lock: The operation-servicing-wide lock object. + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + + Returns: + A _interfaces.ReceptionManager appropriate for front-side use. + """ + return _ReceptionManager( + lock, _FrontReceiver( + termination_manager, transmission_manager, ingestion_manager, + expiration_manager)) + + +def back_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager): + """Creates a _interfaces.ReceptionManager for back-side use. + + Args: + lock: The operation-servicing-wide lock object. + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + + Returns: + A _interfaces.ReceptionManager appropriate for back-side use. + """ + return _ReceptionManager( + lock, _BackReceiver( + termination_manager, transmission_manager, ingestion_manager, + expiration_manager)) diff --git a/src/python/_framework/base/packets/_termination.py b/src/python/_framework/base/packets/_termination.py new file mode 100644 index 0000000000..d586c2167b --- /dev/null +++ b/src/python/_framework/base/packets/_termination.py @@ -0,0 +1,201 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation termination.""" + +from _framework.base import interfaces +from _framework.base.packets import _constants +from _framework.base.packets import _interfaces +from _framework.base.packets import packets +from _framework.foundation import callable_util + +_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' + +# TODO(nathaniel): enum module. +_EMISSION = 'emission' +_TRANSMISSION = 'transmission' +_INGESTION = 'ingestion' + +_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,) +_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,) +_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,) + +_KINDS_TO_OUTCOMES = { + packets.Kind.COMPLETION: interfaces.COMPLETED, + packets.Kind.CANCELLATION: interfaces.CANCELLED, + packets.Kind.EXPIRATION: interfaces.EXPIRED, + packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE, + packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE, + packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, + packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, + } + + +class _TerminationManager(_interfaces.TerminationManager): + """An implementation of _interfaces.TerminationManager.""" + + def __init__( + self, work_pool, utility_pool, action, requirements, local_failure): + """Constructor. + + Args: + work_pool: A thread pool in which customer work will be done. + utility_pool: A thread pool in which work utility work will be done. + action: An action to call on operation termination. + requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION + identifying what must finish for the operation to be considered + completed. + local_failure: A packets.Kind specifying what constitutes local failure of + customer work. + """ + self._work_pool = work_pool + self._utility_pool = utility_pool + self._action = action + self._local_failure = local_failure + self._has_locally_failed = False + + self._outstanding_requirements = set(requirements) + self._kind = None + self._callbacks = [] + + def _terminate(self, kind): + """Terminates the operation. + + Args: + kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION, + packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE, + packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or + packets.Kind.SERVICED_FAILURE. + """ + self._outstanding_requirements = None + callbacks = list(self._callbacks) + self._callbacks = None + self._kind = kind + outcome = _KINDS_TO_OUTCOMES[kind] + + act = callable_util.with_exceptions_logged( + self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) + + if self._has_locally_failed: + self._utility_pool.submit(act, outcome) + else: + def call_callbacks_and_act(callbacks, outcome): + for callback in callbacks: + callback_outcome = callable_util.call_logging_exceptions( + callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) + if callback_outcome.exception is not None: + outcome = _KINDS_TO_OUTCOMES[self._local_failure] + break + self._utility_pool.submit(act, outcome) + + self._work_pool.submit(callable_util.with_exceptions_logged( + call_callbacks_and_act, + _constants.INTERNAL_ERROR_LOG_MESSAGE), + callbacks, outcome) + + def is_active(self): + """See _interfaces.TerminationManager.is_active for specification.""" + return self._outstanding_requirements is not None + + def add_callback(self, callback): + """See _interfaces.TerminationManager.add_callback for specification.""" + if not self._has_locally_failed: + if self._outstanding_requirements is None: + self._work_pool.submit( + callable_util.with_exceptions_logged( + callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), + _KINDS_TO_OUTCOMES[self._kind]) + else: + self._callbacks.append(callback) + + def emission_complete(self): + """See superclass method for specification.""" + if self._outstanding_requirements is not None: + self._outstanding_requirements.discard(_EMISSION) + if not self._outstanding_requirements: + self._terminate(packets.Kind.COMPLETION) + + def transmission_complete(self): + """See superclass method for specification.""" + if self._outstanding_requirements is not None: + self._outstanding_requirements.discard(_TRANSMISSION) + if not self._outstanding_requirements: + self._terminate(packets.Kind.COMPLETION) + + def ingestion_complete(self): + """See superclass method for specification.""" + if self._outstanding_requirements is not None: + self._outstanding_requirements.discard(_INGESTION) + if not self._outstanding_requirements: + self._terminate(packets.Kind.COMPLETION) + + def abort(self, kind): + """See _interfaces.TerminationManager.abort for specification.""" + if kind == self._local_failure: + self._has_failed_locally = True + if self._outstanding_requirements is not None: + self._terminate(kind) + + +def front_termination_manager(work_pool, utility_pool, action, subscription): + """Creates a TerminationManager appropriate for front-side use. + + Args: + work_pool: A thread pool in which customer work will be done. + utility_pool: A thread pool in which work utility work will be done. + action: An action to call on operation termination. + subscription: One of interfaces.FULL, interfaces.termination_only, or + interfaces.NONE. + + Returns: + A TerminationManager appropriate for front-side use. + """ + return _TerminationManager( + work_pool, utility_pool, action, + _FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else + _LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE) + + +def back_termination_manager(work_pool, utility_pool, action, subscription): + """Creates a TerminationManager appropriate for back-side use. + + Args: + work_pool: A thread pool in which customer work will be done. + utility_pool: A thread pool in which work utility work will be done. + action: An action to call on operation termination. + subscription: One of interfaces.FULL, interfaces.termination_only, or + interfaces.NONE. + + Returns: + A TerminationManager appropriate for back-side use. + """ + return _TerminationManager( + work_pool, utility_pool, action, + _BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else + _LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE) diff --git a/src/python/_framework/base/packets/_transmission.py b/src/python/_framework/base/packets/_transmission.py new file mode 100644 index 0000000000..006128774d --- /dev/null +++ b/src/python/_framework/base/packets/_transmission.py @@ -0,0 +1,393 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for packet transmission during an operation.""" + +import abc + +from _framework.base import interfaces +from _framework.base.packets import _constants +from _framework.base.packets import _interfaces +from _framework.base.packets import packets +from _framework.foundation import callable_util + +_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' + +_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = ( + packets.Kind.SERVICER_FAILURE, + ) +_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = ( + packets.Kind.CANCELLATION, + packets.Kind.SERVICED_FAILURE, + ) + + +class _Packetizer(object): + """Common specification of different packet-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def packetize(self, operation_id, sequence_number, payload, complete): + """Creates a packet indicating ordinary operation progress. + + Args: + operation_id: The operation ID for the current operation. + sequence_number: A sequence number for the packet. + payload: A customer payload object. May be None if sequence_number is + zero or complete is true. + complete: A boolean indicating whether or not the packet should describe + itself as (but for a later indication of operation abortion) the last + packet to be sent. + + Returns: + An object of an appropriate type suitable for transmission to the other + side of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def packetize_abortion(self, operation_id, sequence_number, kind): + """Creates a packet indicating that the operation is aborted. + + Args: + operation_id: The operation ID for the current operation. + sequence_number: A sequence number for the packet. + kind: One of the values of packets.Kind indicating operational abortion. + + Returns: + An object of an appropriate type suitable for transmission to the other + side of the operation, or None if transmission is not appropriate for + the given kind. + """ + raise NotImplementedError() + + +class _FrontPacketizer(_Packetizer): + """Front-side packet-creating behavior.""" + + def __init__(self, name, subscription, trace_id, timeout): + """Constructor. + + Args: + name: The name of the operation. + subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or + interfaces.NONE describing the interest the front has in packets sent + from the back. + trace_id: A uuid.UUID identifying a set of related operations to which + this operation belongs. + timeout: A length of time in seconds to allow for the entire operation. + """ + self._name = name + self._subscription = subscription + self._trace_id = trace_id + self._timeout = timeout + + def packetize(self, operation_id, sequence_number, payload, complete): + """See _Packetizer.packetize for specification.""" + if sequence_number: + return packets.FrontToBackPacket( + operation_id, sequence_number, + packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, + self._name, self._subscription, self._trace_id, payload, + self._timeout) + else: + return packets.FrontToBackPacket( + operation_id, 0, + packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT, + self._name, self._subscription, self._trace_id, payload, + self._timeout) + + def packetize_abortion(self, operation_id, sequence_number, kind): + """See _Packetizer.packetize_abortion for specification.""" + if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS: + return None + else: + return packets.FrontToBackPacket( + operation_id, sequence_number, kind, None, None, None, None, None) + + +class _BackPacketizer(_Packetizer): + """Back-side packet-creating behavior.""" + + def packetize(self, operation_id, sequence_number, payload, complete): + """See _Packetizer.packetize for specification.""" + return packets.BackToFrontPacket( + operation_id, sequence_number, + packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, + payload) + + def packetize_abortion(self, operation_id, sequence_number, kind): + """See _Packetizer.packetize_abortion for specification.""" + if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS: + return None + else: + return packets.BackToFrontPacket( + operation_id, sequence_number, kind, None) + + +class TransmissionManager(_interfaces.TransmissionManager): + """A _interfaces.TransmissionManager on which other managers may be set.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_ingestion_and_expiration_managers( + self, ingestion_manager, expiration_manager): + """Sets two of the other managers with which this manager may interact. + + Args: + ingestion_manager: The _interfaces.IngestionManager associated with the + current operation. + expiration_manager: The _interfaces.ExpirationManager associated with the + current operation. + """ + raise NotImplementedError() + + +class _EmptyTransmissionManager(TransmissionManager): + """A completely no-operative _interfaces.TransmissionManager.""" + + def set_ingestion_and_expiration_managers( + self, ingestion_manager, expiration_manager): + """See overriden method for specification.""" + + def inmit(self, emission, complete): + """See _interfaces.TransmissionManager.inmit for specification.""" + + def abort(self, category): + """See _interfaces.TransmissionManager.abort for specification.""" + + +class _TransmittingTransmissionManager(TransmissionManager): + """A TransmissionManager implementation that sends packets.""" + + def __init__( + self, lock, pool, callback, operation_id, packetizer, + termination_manager): + """Constructor. + + Args: + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting packets will be + performed. + callback: A callable that accepts packets and sends them to the other side + of the operation. + operation_id: The operation's ID. + packetizer: A _Packetizer for packet creation. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + """ + self._lock = lock + self._pool = pool + self._callback = callback + self._operation_id = operation_id + self._packetizer = packetizer + self._termination_manager = termination_manager + self._ingestion_manager = None + self._expiration_manager = None + + self._emissions = [] + self._emission_complete = False + self._kind = None + self._lowest_unused_sequence_number = 0 + self._transmitting = False + + def set_ingestion_and_expiration_managers( + self, ingestion_manager, expiration_manager): + """See overridden method for specification.""" + self._ingestion_manager = ingestion_manager + self._expiration_manager = expiration_manager + + def _lead_packet(self, emission, complete): + """Creates a packet suitable for leading off the transmission loop. + + Args: + emission: A customer payload object to be sent to the other side of the + operation. + complete: Whether or not the sequence of customer payloads ends with + the passed object. + + Returns: + A packet with which to lead off the transmission loop. + """ + sequence_number = self._lowest_unused_sequence_number + self._lowest_unused_sequence_number += 1 + return self._packetizer.packetize( + self._operation_id, sequence_number, emission, complete) + + def _abortive_response_packet(self, kind): + """Creates a packet indicating operation abortion. + + Args: + kind: One of the values of packets.Kind indicating operational abortion. + + Returns: + A packet indicating operation abortion. + """ + packet = self._packetizer.packetize_abortion( + self._operation_id, self._lowest_unused_sequence_number, kind) + if packet is None: + return None + else: + self._lowest_unused_sequence_number += 1 + return packet + + def _next_packet(self): + """Creates the next packet to be sent to the other side of the operation. + + Returns: + A (completed, packet) tuple comprised of a boolean indicating whether or + not the sequence of packets has completed normally and a packet to send + to the other side if the sequence of packets hasn't completed. The tuple + will never have both a True first element and a non-None second element. + """ + if self._emissions is None: + return False, None + elif self._kind is None: + if self._emissions: + payload = self._emissions.pop(0) + complete = self._emission_complete and not self._emissions + sequence_number = self._lowest_unused_sequence_number + self._lowest_unused_sequence_number += 1 + return complete, self._packetizer.packetize( + self._operation_id, sequence_number, payload, complete) + else: + return self._emission_complete, None + else: + packet = self._abortive_response_packet(self._kind) + self._emissions = None + return False, None if packet is None else packet + + def _transmit(self, packet): + """Commences the transmission loop sending packets. + + Args: + packet: A packet to be sent to the other side of the operation. + """ + def transmit(packet): + while True: + transmission_outcome = callable_util.call_logging_exceptions( + self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet) + if transmission_outcome.exception is None: + with self._lock: + complete, packet = self._next_packet() + if packet is None: + if complete: + self._termination_manager.transmission_complete() + self._transmitting = False + return + else: + with self._lock: + self._emissions = None + self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE) + self._ingestion_manager.abort() + self._expiration_manager.abort() + self._transmitting = False + return + + self._pool.submit(callable_util.with_exceptions_logged( + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet) + self._transmitting = True + + def inmit(self, emission, complete): + """See _interfaces.TransmissionManager.inmit for specification.""" + if self._emissions is not None and self._kind is None: + self._emission_complete = complete + if self._transmitting: + self._emissions.append(emission) + else: + self._transmit(self._lead_packet(emission, complete)) + + def abort(self, kind): + """See _interfaces.TransmissionManager.abort for specification.""" + if self._emissions is not None and self._kind is None: + self._kind = kind + if not self._transmitting: + packet = self._abortive_response_packet(kind) + self._emissions = None + if packet is not None: + self._transmit(packet) + + +def front_transmission_manager( + lock, pool, callback, operation_id, name, subscription, trace_id, timeout, + termination_manager): + """Creates a TransmissionManager appropriate for front-side use. + + Args: + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting packets will be + performed. + callback: A callable that accepts packets and sends them to the other side + of the operation. + operation_id: The operation's ID. + name: The name of the operation. + subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or + interfaces.NONE describing the interest the front has in packets sent + from the back. + trace_id: A uuid.UUID identifying a set of related operations to which + this operation belongs. + timeout: A length of time in seconds to allow for the entire operation. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + + Returns: + A TransmissionManager appropriate for front-side use. + """ + return _TransmittingTransmissionManager( + lock, pool, callback, operation_id, _FrontPacketizer( + name, subscription, trace_id, timeout), + termination_manager) + + +def back_transmission_manager( + lock, pool, callback, operation_id, termination_manager, subscription): + """Creates a TransmissionManager appropriate for back-side use. + + Args: + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting packets will be + performed. + callback: A callable that accepts packets and sends them to the other side + of the operation. + operation_id: The operation's ID. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or + interfaces.NONE describing the interest the front has in packets sent from + the back. + + Returns: + A TransmissionManager appropriate for back-side use. + """ + if subscription == interfaces.NONE: + return _EmptyTransmissionManager() + else: + return _TransmittingTransmissionManager( + lock, pool, callback, operation_id, _BackPacketizer(), + termination_manager) diff --git a/src/python/_framework/base/packets/implementations.py b/src/python/_framework/base/packets/implementations.py new file mode 100644 index 0000000000..2f07054d4d --- /dev/null +++ b/src/python/_framework/base/packets/implementations.py @@ -0,0 +1,77 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the packet-exchange-based implementation the base layer.""" + +# interfaces is referenced from specification in this module. +from _framework.base.packets import _ends +from _framework.base.packets import interfaces # pylint: disable=unused-import + + +def front(work_pool, transmission_pool, utility_pool): + """Factory function for creating interfaces.Fronts. + + Args: + work_pool: A thread pool to be used for doing work within the created Front + object. + transmission_pool: A thread pool to be used within the created Front object + for transmitting values to some Back object. + utility_pool: A thread pool to be used within the created Front object for + utility tasks. + + Returns: + An interfaces.Front. + """ + return _ends.Front(work_pool, transmission_pool, utility_pool) + + +def back( + servicer, work_pool, transmission_pool, utility_pool, default_timeout, + maximum_timeout): + """Factory function for creating interfaces.Backs. + + Args: + servicer: An interfaces.Servicer for servicing operations. + work_pool: A thread pool to be used for doing work within the created Back + object. + transmission_pool: A thread pool to be used within the created Back object + for transmitting values to some Front object. + utility_pool: A thread pool to be used within the created Back object for + utility tasks. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An interfaces.Back. + """ + return _ends.Back( + servicer, work_pool, transmission_pool, utility_pool, default_timeout, + maximum_timeout) diff --git a/src/python/_framework/base/packets/implementations_test.py b/src/python/_framework/base/packets/implementations_test.py new file mode 100644 index 0000000000..8bb5353176 --- /dev/null +++ b/src/python/_framework/base/packets/implementations_test.py @@ -0,0 +1,80 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests for _framework.base.packets.implementations.""" + +import unittest + +from _framework.base import interfaces_test +from _framework.base import util +from _framework.base.packets import implementations +from _framework.foundation import logging_pool + +POOL_MAX_WORKERS = 100 +DEFAULT_TIMEOUT = 30 +MAXIMUM_TIMEOUT = 60 + + +class ImplementationsTest( + interfaces_test.FrontAndBackTest, unittest.TestCase): + + def setUp(self): + self.memory_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.front_work_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.front_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.front_utility_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.back_work_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.back_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.test_pool = logging_pool.pool(POOL_MAX_WORKERS) + self.test_servicer = interfaces_test.TestServicer(self.test_pool) + self.front = implementations.front( + self.front_work_pool, self.front_transmission_pool, + self.front_utility_pool) + self.back = implementations.back( + self.test_servicer, self.back_work_pool, self.back_transmission_pool, + self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT) + self.front.join_rear_link(self.back) + self.back.join_fore_link(self.front) + + def tearDown(self): + util.wait_for_idle(self.back) + util.wait_for_idle(self.front) + self.memory_transmission_pool.shutdown(wait=True) + self.front_work_pool.shutdown(wait=True) + self.front_transmission_pool.shutdown(wait=True) + self.front_utility_pool.shutdown(wait=True) + self.back_work_pool.shutdown(wait=True) + self.back_transmission_pool.shutdown(wait=True) + self.back_utility_pool.shutdown(wait=True) + self.test_pool.shutdown(wait=True) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/base/packets/in_memory.py b/src/python/_framework/base/packets/in_memory.py new file mode 100644 index 0000000000..17daf3acf7 --- /dev/null +++ b/src/python/_framework/base/packets/in_memory.py @@ -0,0 +1,108 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the packet-exchange-based implementation the base layer.""" + +import threading + +from _framework.base.packets import _constants +from _framework.base.packets import interfaces +from _framework.foundation import callable_util + + +class _Serializer(object): + """A utility for serializing values that may arrive concurrently.""" + + def __init__(self, pool): + self._lock = threading.Lock() + self._pool = pool + self._sink = None + self._spinning = False + self._values = [] + + def _spin(self, sink, value): + while True: + sink(value) + with self._lock: + if self._sink is None or not self._values: + self._spinning = False + return + else: + sink, value = self._sink, self._values.pop(0) + + def set_sink(self, sink): + with self._lock: + self._sink = sink + if sink is not None and self._values and not self._spinning: + self._spinning = True + self._pool.submit( + callable_util.with_exceptions_logged( + self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE), + sink, self._values.pop(0)) + + def add_value(self, value): + with self._lock: + if self._sink and not self._spinning: + self._spinning = True + self._pool.submit( + callable_util.with_exceptions_logged( + self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._sink, value) + else: + self._values.append(value) + + +class Link(interfaces.ForeLink, interfaces.RearLink): + """A trivial implementation of interfaces.ForeLink and interfaces.RearLink.""" + + def __init__(self, pool): + """Constructor. + + Args: + pool: A thread pool to be used for serializing ticket exchange in each + direction. + """ + self._front_to_back = _Serializer(pool) + self._back_to_front = _Serializer(pool) + + def join_fore_link(self, fore_link): + """See interfaces.RearLink.join_fore_link for specification.""" + self._back_to_front.set_sink(fore_link.accept_back_to_front_ticket) + + def join_rear_link(self, rear_link): + """See interfaces.ForeLink.join_rear_link for specification.""" + self._front_to_back.set_sink(rear_link.accept_front_to_back_ticket) + + def accept_front_to_back_ticket(self, ticket): + """See interfaces.ForeLink.accept_front_to_back_ticket for specification.""" + self._front_to_back.add_value(ticket) + + def accept_back_to_front_ticket(self, ticket): + """See interfaces.RearLink.accept_back_to_front_ticket for specification.""" + self._back_to_front.add_value(ticket) diff --git a/src/python/_framework/base/packets/interfaces.py b/src/python/_framework/base/packets/interfaces.py new file mode 100644 index 0000000000..99f9e87772 --- /dev/null +++ b/src/python/_framework/base/packets/interfaces.py @@ -0,0 +1,84 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces defined and used by the base layer of RPC Framework.""" + +import abc + +# packets is referenced from specifications in this module. +from _framework.base import interfaces +from _framework.base.packets import packets # pylint: disable=unused-import + + +class ForeLink(object): + """Accepts back-to-front tickets and emits front-to-back tickets.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def accept_back_to_front_ticket(self, ticket): + """Accept a packets.BackToFrontPacket. + + Args: + ticket: Any packets.BackToFrontPacket. + """ + raise NotImplementedError() + + @abc.abstractmethod + def join_rear_link(self, rear_link): + """Mates this object with a peer with which it will exchange tickets.""" + raise NotImplementedError() + + +class RearLink(object): + """Accepts front-to-back tickets and emits back-to-front tickets.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def accept_front_to_back_ticket(self, ticket): + """Accepts a packets.FrontToBackPacket. + + Args: + ticket: Any packets.FrontToBackPacket. + """ + raise NotImplementedError() + + @abc.abstractmethod + def join_fore_link(self, fore_link): + """Mates this object with a peer with which it will exchange tickets.""" + raise NotImplementedError() + + +class Front(ForeLink, interfaces.Front): + """Clientish objects that operate by sending and receiving tickets.""" + __metaclass__ = abc.ABCMeta + + +class Back(RearLink, interfaces.Back): + """Serverish objects that operate by sending and receiving tickets.""" + __metaclass__ = abc.ABCMeta diff --git a/src/python/_framework/base/packets/null.py b/src/python/_framework/base/packets/null.py new file mode 100644 index 0000000000..9b40a00505 --- /dev/null +++ b/src/python/_framework/base/packets/null.py @@ -0,0 +1,56 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Null links that ignore tickets passed to them.""" + +from _framework.base.packets import interfaces + + +class _NullForeLink(interfaces.ForeLink): + """A do-nothing ForeLink.""" + + def accept_back_to_front_ticket(self, ticket): + pass + + def join_rear_link(self, rear_link): + raise NotImplementedError() + + +class _NullRearLink(interfaces.RearLink): + """A do-nothing RearLink.""" + + def accept_front_to_back_ticket(self, ticket): + pass + + def join_fore_link(self, fore_link): + raise NotImplementedError() + + +NULL_FORE_LINK = _NullForeLink() +NULL_REAR_LINK = _NullRearLink() diff --git a/src/python/_framework/base/packets/packets.py b/src/python/_framework/base/packets/packets.py new file mode 100644 index 0000000000..1315ca650e --- /dev/null +++ b/src/python/_framework/base/packets/packets.py @@ -0,0 +1,112 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Packets used between fronts and backs.""" + +import collections +import enum + +# interfaces is referenced from specifications in this module. +from _framework.base import interfaces # pylint: disable=unused-import + + +@enum.unique +class Kind(enum.Enum): + """Identifies the overall kind of a ticket.""" + + COMMENCEMENT = 'commencement' + CONTINUATION = 'continuation' + COMPLETION = 'completion' + ENTIRE = 'entire' + CANCELLATION = 'cancellation' + EXPIRATION = 'expiration' + SERVICER_FAILURE = 'servicer failure' + SERVICED_FAILURE = 'serviced failure' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + + +class FrontToBackPacket( + collections.namedtuple( + 'FrontToBackPacket', + ['operation_id', 'sequence_number', 'kind', 'name', 'subscription', + 'trace_id', 'payload', 'timeout'])): + """A sum type for all values sent from a front to a back. + + Attributes: + operation_id: A unique-with-respect-to-equality hashable object identifying + a particular operation. + sequence_number: A zero-indexed integer sequence number identifying the + packet's place among all the packets sent from front to back for this + particular operation. Must be zero if kind is Kind.COMMENCEMENT or + Kind.ENTIRE. Must be positive for any other kind. + kind: One of Kind.COMMENCEMENT, Kind.CONTINUATION, Kind.COMPLETION, + Kind.ENTIRE, Kind.CANCELLATION, Kind.EXPIRATION, Kind.SERVICED_FAILURE, + Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE. + name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT + or Kind.ENTIRE. Must be None for any other kind. + subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or + interfaces.NONE describing the interest the front has in packets sent from + the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE. + Must be None for any other kind. + trace_id: A uuid.UUID identifying a set of related operations to which this + operation belongs. May be None. + payload: A customer payload object. Must be present if kind is + Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None + for any other kind. + timeout: An optional length of time (measured from the beginning of the + operation) to allow for the entire operation. If None, a default value on + the back will be used. If present and excessively large, the back may + limit the operation to a smaller duration of its choice. May be present + for any ticket kind; setting a value on a later ticket allows fronts + to request time extensions (or even time reductions!) on in-progress + operations. + """ + + +class BackToFrontPacket( + collections.namedtuple( + 'BackToFrontPacket', + ['operation_id', 'sequence_number', 'kind', 'payload'])): + """A sum type for all values sent from a back to a front. + + Attributes: + operation_id: A unique-with-respect-to-equality hashable object identifying + a particular operation. + sequence_number: A zero-indexed integer sequence number identifying the + packet's place among all the packets sent from back to front for this + particular operation. + kind: One of Kind.CONTINUATION, Kind.COMPLETION, Kind.EXPIRATION, + Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or + Kind.TRANSMISSION_FAILURE. + payload: A customer payload object. Must be present if kind is + Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None if + kind is Kind.EXPIRATION, Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or + Kind.TRANSMISSION_FAILURE. + """ diff --git a/src/python/_framework/base/util.py b/src/python/_framework/base/util.py new file mode 100644 index 0000000000..6bbd18a59a --- /dev/null +++ b/src/python/_framework/base/util.py @@ -0,0 +1,91 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities helpful for working with the base layer of RPC Framework.""" + +import collections +import threading + +from _framework.base import interfaces + + +class _ServicedSubscription( + collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']), + interfaces.ServicedSubscription): + """See interfaces.ServicedSubscription for specification.""" + +_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None) +_TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription( + interfaces.TERMINATION_ONLY, None) + + +def none_serviced_subscription(): + """Creates a "none" interfaces.ServicedSubscription object. + + Returns: + An interfaces.ServicedSubscription indicating no subscription to an + operation's results (such as would be the case for a fire-and-forget + operation invocation). + """ + return _NONE_SUBSCRIPTION + + +def termination_only_serviced_subscription(): + """Creates a "termination only" interfaces.ServicedSubscription object. + + Returns: + An interfaces.ServicedSubscription indicating that the front-side customer + is interested only in the overall termination outcome of the operation + (such as completion or expiration) and would ignore the actual results of + the operation. + """ + return _TERMINATION_ONLY_SUBSCRIPTION + + +def full_serviced_subscription(ingestor): + """Creates a "full" interfaces.ServicedSubscription object. + + Args: + ingestor: A ServicedIngestor. + + Returns: + A ServicedSubscription object indicating a full subscription. + """ + return _ServicedSubscription(interfaces.FULL, ingestor) + + +def wait_for_idle(end): + """Waits for an interfaces.End to complete all operations. + + Args: + end: Any interfaces.End. + """ + event = threading.Event() + end.add_idle_action(event.set) + event.wait() diff --git a/src/python/_framework/common/__init__.py b/src/python/_framework/common/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/common/__init__.py diff --git a/src/python/_framework/common/cardinality.py b/src/python/_framework/common/cardinality.py new file mode 100644 index 0000000000..610425e803 --- /dev/null +++ b/src/python/_framework/common/cardinality.py @@ -0,0 +1,42 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines an enum for classifying RPC methods by streaming semantics.""" + +import enum + + +@enum.unique +class Cardinality(enum.Enum): + """Describes the streaming semantics of an RPC method.""" + + UNARY_UNARY = 'request-unary/response-unary' + UNARY_STREAM = 'request-unary/response-streaming' + STREAM_UNARY = 'request-streaming/response-unary' + STREAM_STREAM = 'request-streaming/response-streaming' diff --git a/src/python/_framework/face/__init__.py b/src/python/_framework/face/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/face/__init__.py diff --git a/src/python/_framework/face/_calls.py b/src/python/_framework/face/_calls.py new file mode 100644 index 0000000000..ab58e6378b --- /dev/null +++ b/src/python/_framework/face/_calls.py @@ -0,0 +1,310 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utility functions for invoking RPCs.""" + +import threading + +from _framework.base import interfaces as base_interfaces +from _framework.base import util as base_util +from _framework.face import _control +from _framework.face import interfaces +from _framework.foundation import callable_util +from _framework.foundation import future + +_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!' +_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!' + + +class _RendezvousServicedIngestor(base_interfaces.ServicedIngestor): + + def __init__(self, rendezvous): + self._rendezvous = rendezvous + + def consumer(self, operation_context): + return self._rendezvous + + +class _EventServicedIngestor(base_interfaces.ServicedIngestor): + + def __init__(self, result_consumer, abortion_callback): + self._result_consumer = result_consumer + self._abortion_callback = abortion_callback + + def consumer(self, operation_context): + operation_context.add_termination_callback( + _control.as_operation_termination_callback(self._abortion_callback)) + return self._result_consumer + + +def _rendezvous_subscription(rendezvous): + return base_util.full_serviced_subscription( + _RendezvousServicedIngestor(rendezvous)) + + +def _unary_event_subscription(completion_callback, abortion_callback): + return base_util.full_serviced_subscription( + _EventServicedIngestor( + _control.UnaryConsumer(completion_callback), abortion_callback)) + + +def _stream_event_subscription(result_consumer, abortion_callback): + return base_util.full_serviced_subscription( + _EventServicedIngestor(result_consumer, abortion_callback)) + + +class _OperationCancellableIterator(interfaces.CancellableIterator): + """An interfaces.CancellableIterator for response-streaming operations.""" + + def __init__(self, rendezvous, operation): + self._rendezvous = rendezvous + self._operation = operation + + def __iter__(self): + return self + + def next(self): + return next(self._rendezvous) + + def cancel(self): + self._operation.cancel() + self._rendezvous.set_outcome(base_interfaces.CANCELLED) + + +class _OperationFuture(future.Future): + """A future.Future interface to an operation.""" + + def __init__(self, rendezvous, operation): + self._condition = threading.Condition() + self._rendezvous = rendezvous + self._operation = operation + + self._outcome = None + self._callbacks = [] + + def cancel(self): + """See future.Future.cancel for specification.""" + with self._condition: + if self._outcome is None: + self._operation.cancel() + self._outcome = future.aborted() + self._condition.notify_all() + return False + + def cancelled(self): + """See future.Future.cancelled for specification.""" + return False + + def done(self): + """See future.Future.done for specification.""" + with self._condition: + return (self._outcome is not None and + self._outcome.category is not future.ABORTED) + + def outcome(self): + """See future.Future.outcome for specification.""" + with self._condition: + while self._outcome is None: + self._condition.wait() + return self._outcome + + def add_done_callback(self, callback): + """See future.Future.add_done_callback for specification.""" + with self._condition: + if self._callbacks is not None: + self._callbacks.add(callback) + return + + outcome = self._outcome + + callable_util.call_logging_exceptions( + callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + + def on_operation_termination(self, operation_outcome): + """Indicates to this object that the operation has terminated. + + Args: + operation_outcome: One of base_interfaces.COMPLETED, + base_interfaces.CANCELLED, base_interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE, + base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE + indicating the categorical outcome of the operation. + """ + with self._condition: + if (self._outcome is None and + operation_outcome != base_interfaces.COMPLETED): + self._outcome = future.raised( + _control.abortion_outcome_to_exception(operation_outcome)) + self._condition.notify_all() + + outcome = self._outcome + rendezvous = self._rendezvous + callbacks = list(self._callbacks) + self._callbacks = None + + if outcome is None: + try: + return_value = next(rendezvous) + except Exception as e: # pylint: disable=broad-except + outcome = future.raised(e) + else: + outcome = future.returned(return_value) + with self._condition: + if self._outcome is None: + self._outcome = outcome + self._condition.notify_all() + else: + outcome = self._outcome + + for callback in callbacks: + callable_util.call_logging_exceptions( + callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + + +class _Call(interfaces.Call): + + def __init__(self, operation): + self._operation = operation + self.context = _control.RpcContext(operation.context) + + def cancel(self): + self._operation.cancel() + + +def blocking_value_in_value_out(front, name, payload, timeout, trace_id): + """Services in a blocking fashion a value-in value-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + return next(rendezvous) + + +def future_value_in_value_out(front, name, payload, timeout, trace_id): + """Services a value-in value-out servicer method by returning a Future.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + operation_future = _OperationFuture(rendezvous, operation) + operation.context.add_termination_callback( + operation_future.on_operation_termination) + return operation_future + + +def inline_value_in_stream_out(front, name, payload, timeout, trace_id): + """Services a value-in stream-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + return _OperationCancellableIterator(rendezvous, operation) + + +def blocking_stream_in_value_out( + front, name, payload_iterator, timeout, trace_id): + """Services in a blocking fashion a stream-in value-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + for payload in payload_iterator: + operation.consumer.consume(payload) + operation.consumer.terminate() + return next(rendezvous) + + +def future_stream_in_value_out( + front, name, payload_iterator, timeout, trace_id, pool): + """Services a stream-in value-out servicer method by returning a Future.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + pool.submit( + callable_util.with_exceptions_logged( + _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), + payload_iterator, operation.consumer, lambda: True, True) + operation_future = _OperationFuture(rendezvous, operation) + operation.context.add_termination_callback( + operation_future.on_operation_termination) + return operation_future + + +def inline_stream_in_stream_out( + front, name, payload_iterator, timeout, trace_id, pool): + """Services a stream-in stream-out servicer method.""" + rendezvous = _control.Rendezvous() + subscription = _rendezvous_subscription(rendezvous) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + operation.context.add_termination_callback(rendezvous.set_outcome) + pool.submit( + callable_util.with_exceptions_logged( + _control.pipe_iterator_to_consumer, _ITERATOR_EXCEPTION_LOG_MESSAGE), + payload_iterator, operation.consumer, lambda: True, True) + return _OperationCancellableIterator(rendezvous, operation) + + +def event_value_in_value_out( + front, name, payload, completion_callback, abortion_callback, timeout, + trace_id): + subscription = _unary_event_subscription( + completion_callback, abortion_callback) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + return _Call(operation) + + +def event_value_in_stream_out( + front, name, payload, result_payload_consumer, abortion_callback, timeout, + trace_id): + subscription = _stream_event_subscription( + result_payload_consumer, abortion_callback) + operation = front.operate( + name, payload, True, timeout, subscription, trace_id) + return _Call(operation) + + +def event_stream_in_value_out( + front, name, completion_callback, abortion_callback, timeout, trace_id): + subscription = _unary_event_subscription( + completion_callback, abortion_callback) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + return _Call(operation), operation.consumer + + +def event_stream_in_stream_out( + front, name, result_payload_consumer, abortion_callback, timeout, trace_id): + subscription = _stream_event_subscription( + result_payload_consumer, abortion_callback) + operation = front.operate(name, None, False, timeout, subscription, trace_id) + return _Call(operation), operation.consumer diff --git a/src/python/_framework/face/_control.py b/src/python/_framework/face/_control.py new file mode 100644 index 0000000000..2c221321d6 --- /dev/null +++ b/src/python/_framework/face/_control.py @@ -0,0 +1,194 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for translating between sync and async control flow.""" + +import threading + +from _framework.base import interfaces as base_interfaces +from _framework.face import exceptions +from _framework.face import interfaces +from _framework.foundation import abandonment +from _framework.foundation import stream + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-(' + +_OPERATION_OUTCOME_TO_RPC_ABORTION = { + base_interfaces.CANCELLED: interfaces.CANCELLED, + base_interfaces.EXPIRED: interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE, + base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE, + base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, + base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, + } + + +def _as_operation_termination_callback(rpc_abortion_callback): + def operation_termination_callback(operation_outcome): + rpc_abortion = _OPERATION_OUTCOME_TO_RPC_ABORTION.get( + operation_outcome, None) + if rpc_abortion is not None: + rpc_abortion_callback(rpc_abortion) + return operation_termination_callback + + +def _abortion_outcome_to_exception(abortion_outcome): + if abortion_outcome == base_interfaces.CANCELLED: + return exceptions.CancellationError() + elif abortion_outcome == base_interfaces.EXPIRED: + return exceptions.ExpirationError() + elif abortion_outcome == base_interfaces.SERVICER_FAILURE: + return exceptions.ServicerError() + elif abortion_outcome == base_interfaces.SERVICED_FAILURE: + return exceptions.ServicedError() + else: + return exceptions.NetworkError() + + +class UnaryConsumer(stream.Consumer): + """A stream.Consumer that should only ever be passed one value.""" + + def __init__(self, on_termination): + self._on_termination = on_termination + self._value = None + + def consume(self, value): + self._value = value + + def terminate(self): + self._on_termination(self._value) + + def consume_and_terminate(self, value): + self._on_termination(value) + + +class Rendezvous(stream.Consumer): + """A rendez-vous with stream.Consumer and iterator interfaces.""" + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._values_completed = False + self._abortion = None + + def consume(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def terminate(self): + with self._condition: + self._values_completed = True + self._condition.notify() + + def consume_and_terminate(self, value): + with self._condition: + self._values.append(value) + self._values_completed = True + self._condition.notify() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while ((self._abortion is None) and + (not self._values) and + (not self._values_completed)): + self._condition.wait() + if self._abortion is not None: + raise _abortion_outcome_to_exception(self._abortion) + elif self._values: + return self._values.pop(0) + elif self._values_completed: + raise StopIteration() + else: + raise AssertionError('Unreachable code reached!') + + def set_outcome(self, outcome): + with self._condition: + if outcome != base_interfaces.COMPLETED: + self._abortion = outcome + self._condition.notify() + + +class RpcContext(interfaces.RpcContext): + """A wrapped base_interfaces.OperationContext.""" + + def __init__(self, operation_context): + self._operation_context = operation_context + + def is_active(self): + return self._operation_context.is_active() + + def time_remaining(self): + return self._operation_context.time_remaining() + + def add_abortion_callback(self, abortion_callback): + self._operation_context.add_termination_callback( + _as_operation_termination_callback(abortion_callback)) + + +def pipe_iterator_to_consumer(iterator, consumer, active, terminate): + """Pipes values emitted from an iterator to a stream.Consumer. + + Args: + iterator: An iterator from which values will be emitted. + consumer: A stream.Consumer to which values will be passed. + active: A no-argument callable that returns True if the work being done by + this function is still valid and should not be abandoned and False if the + work being done by this function should be abandoned. + terminate: A boolean indicating whether or not this function should + terminate the given consumer after passing to it all values emitted by the + given iterator. + + Raises: + abandonment.Abandoned: If this function quits early after seeing False + returned by the active function passed to it. + Exception: This function raises whatever exceptions are raised by iterating + over the given iterator. + """ + for element in iterator: + if not active(): + raise abandonment.Abandoned() + + consumer.consume(element) + + if not active(): + raise abandonment.Abandoned() + if terminate: + consumer.terminate() + + +def abortion_outcome_to_exception(abortion_outcome): + return _abortion_outcome_to_exception(abortion_outcome) + + +def as_operation_termination_callback(rpc_abortion_callback): + return _as_operation_termination_callback(rpc_abortion_callback) diff --git a/src/python/_framework/face/_service.py b/src/python/_framework/face/_service.py new file mode 100644 index 0000000000..d758c2f148 --- /dev/null +++ b/src/python/_framework/face/_service.py @@ -0,0 +1,189 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Behaviors for servicing RPCs.""" + +# base_interfaces and interfaces are referenced from specification in this +# module. +from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import +from _framework.face import _control +from _framework.face import exceptions +from _framework.face import interfaces # pylint: disable=unused-import +from _framework.foundation import abandonment +from _framework.foundation import callable_util +from _framework.foundation import stream +from _framework.foundation import stream_util + + +class _ValueInStreamOutConsumer(stream.Consumer): + """A stream.Consumer that maps inputs one-to-many onto outputs.""" + + def __init__(self, behavior, context, downstream): + """Constructor. + + Args: + behavior: A callable that takes a single value and an + interfaces.RpcContext and returns a generator of arbitrarily many + values. + context: An interfaces.RpcContext. + downstream: A stream.Consumer to which to pass the values generated by the + given behavior. + """ + self._behavior = behavior + self._context = context + self._downstream = downstream + + def consume(self, value): + _control.pipe_iterator_to_consumer( + self._behavior(value, self._context), self._downstream, + self._context.is_active, False) + + def terminate(self): + self._downstream.terminate() + + def consume_and_terminate(self, value): + _control.pipe_iterator_to_consumer( + self._behavior(value, self._context), self._downstream, + self._context.is_active, True) + + +def _pool_wrap(behavior, operation_context): + """Wraps an operation-related behavior so that it may be called in a pool. + + Args: + behavior: A callable related to carrying out an operation. + operation_context: A base_interfaces.OperationContext for the operation. + + Returns: + A callable that when called carries out the behavior of the given callable + and handles whatever exceptions it raises appropriately. + """ + def translation(*args): + try: + behavior(*args) + except ( + abandonment.Abandoned, + exceptions.ExpirationError, + exceptions.CancellationError, + exceptions.ServicedError, + exceptions.NetworkError) as e: + if operation_context.is_active(): + operation_context.fail(e) + except Exception as e: + operation_context.fail(e) + return callable_util.with_exceptions_logged( + translation, _control.INTERNAL_ERROR_LOG_MESSAGE) + + +def adapt_inline_value_in_value_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return stream_util.TransformingConsumer( + lambda request: method.service(request, rpc_context), response_consumer) + return adaptation + + +def adapt_inline_value_in_stream_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return _ValueInStreamOutConsumer( + method.service, rpc_context, response_consumer) + return adaptation + + +def adapt_inline_stream_in_value_out(method, pool): + def adaptation(response_consumer, operation_context): + rendezvous = _control.Rendezvous() + operation_context.add_termination_callback(rendezvous.set_outcome) + def in_pool_thread(): + response_consumer.consume_and_terminate( + method.service(rendezvous, _control.RpcContext(operation_context))) + pool.submit(_pool_wrap(in_pool_thread, operation_context)) + return rendezvous + return adaptation + + +def adapt_inline_stream_in_stream_out(method, pool): + """Adapts an interfaces.InlineStreamInStreamOutMethod for use with Consumers. + + RPCs may be serviced by calling the return value of this function, passing + request values to the stream.Consumer returned from that call, and receiving + response values from the stream.Consumer passed to that call. + + Args: + method: An interfaces.InlineStreamInStreamOutMethod. + pool: A thread pool. + + Returns: + A callable that takes a stream.Consumer and a + base_interfaces.OperationContext and returns a stream.Consumer. + """ + def adaptation(response_consumer, operation_context): + rendezvous = _control.Rendezvous() + operation_context.add_termination_callback(rendezvous.set_outcome) + def in_pool_thread(): + _control.pipe_iterator_to_consumer( + method.service(rendezvous, _control.RpcContext(operation_context)), + response_consumer, operation_context.is_active, True) + pool.submit(_pool_wrap(in_pool_thread, operation_context)) + return rendezvous + return adaptation + + +def adapt_event_value_in_value_out(method): + def adaptation(response_consumer, operation_context): + def on_payload(payload): + method.service( + payload, response_consumer.consume_and_terminate, + _control.RpcContext(operation_context)) + return _control.UnaryConsumer(on_payload) + return adaptation + + +def adapt_event_value_in_stream_out(method): + def adaptation(response_consumer, operation_context): + def on_payload(payload): + method.service( + payload, response_consumer, _control.RpcContext(operation_context)) + return _control.UnaryConsumer(on_payload) + return adaptation + + +def adapt_event_stream_in_value_out(method): + def adaptation(response_consumer, operation_context): + rpc_context = _control.RpcContext(operation_context) + return method.service(response_consumer.consume_and_terminate, rpc_context) + return adaptation + + +def adapt_event_stream_in_stream_out(method): + def adaptation(response_consumer, operation_context): + return method.service( + response_consumer, _control.RpcContext(operation_context)) + return adaptation diff --git a/src/python/_framework/face/_test_case.py b/src/python/_framework/face/_test_case.py new file mode 100644 index 0000000000..50b55c389f --- /dev/null +++ b/src/python/_framework/face/_test_case.py @@ -0,0 +1,81 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Common lifecycle code for in-memory-ticket-exchange Face-layer tests.""" + +from _framework.face import implementations +from _framework.face.testing import base_util +from _framework.face.testing import test_case +from _framework.foundation import logging_pool + +_TIMEOUT = 3 +_MAXIMUM_POOL_SIZE = 100 + + +class FaceTestCase(test_case.FaceTestCase): + """Provides abstract Face-layer tests an in-memory implementation.""" + + def set_up_implementation( + self, + name, + methods, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods, + multi_method): + servicer_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + stub_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + + servicer = implementations.servicer( + servicer_pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, + multi_method=multi_method) + + linked_pair = base_util.linked_pair(servicer, _TIMEOUT) + server = implementations.server() + stub = implementations.stub(linked_pair.front, stub_pool) + return server, stub, (servicer_pool, stub_pool, linked_pair) + + def tear_down_implementation(self, memo): + servicer_pool, stub_pool, linked_pair = memo + linked_pair.shut_down() + stub_pool.shutdown(wait=True) + servicer_pool.shutdown(wait=True) diff --git a/src/python/_framework/face/blocking_invocation_inline_service_test.py b/src/python/_framework/face/blocking_invocation_inline_service_test.py new file mode 100644 index 0000000000..96563c94ee --- /dev/null +++ b/src/python/_framework/face/blocking_invocation_inline_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import blocking_invocation_inline_service_test_case as test_case + + +class BlockingInvocationInlineServiceTest( + _test_case.FaceTestCase, + test_case.BlockingInvocationInlineServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/demonstration.py b/src/python/_framework/face/demonstration.py new file mode 100644 index 0000000000..501ec6b3f8 --- /dev/null +++ b/src/python/_framework/face/demonstration.py @@ -0,0 +1,118 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Demonstration-suitable implementation of the face layer of RPC Framework.""" + +from _framework.base import util as _base_util +from _framework.base.packets import implementations as _tickets_implementations +from _framework.face import implementations +from _framework.foundation import logging_pool + +_POOL_SIZE_LIMIT = 20 + +_MAXIMUM_TIMEOUT = 90 + + +class LinkedPair(object): + """A Server and Stub that are linked to one another. + + Attributes: + server: A Server. + stub: A Stub. + """ + + def shut_down(self): + """Shuts down this object and releases its resources.""" + raise NotImplementedError() + + +class _LinkedPair(LinkedPair): + + def __init__(self, server, stub, front, back, pools): + self.server = server + self.stub = stub + self._front = front + self._back = back + self._pools = pools + + def shut_down(self): + _base_util.wait_for_idle(self._front) + _base_util.wait_for_idle(self._back) + + for pool in self._pools: + pool.shutdown(wait=True) + + +def server_and_stub( + default_timeout, + inline_value_in_value_out_methods=None, + inline_value_in_stream_out_methods=None, + inline_stream_in_value_out_methods=None, + inline_stream_in_stream_out_methods=None, + event_value_in_value_out_methods=None, + event_value_in_stream_out_methods=None, + event_stream_in_value_out_methods=None, + event_stream_in_stream_out_methods=None, + multi_method=None): + """Creates a Server and Stub linked together for use.""" + front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + stub_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + pools = ( + front_work_pool, front_transmission_pool, front_utility_pool, + back_work_pool, back_transmission_pool, back_utility_pool, + stub_pool) + + servicer = implementations.servicer( + back_work_pool, + inline_value_in_value_out_methods=inline_value_in_value_out_methods, + inline_value_in_stream_out_methods=inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods=inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods=inline_stream_in_stream_out_methods, + event_value_in_value_out_methods=event_value_in_value_out_methods, + event_value_in_stream_out_methods=event_value_in_stream_out_methods, + event_stream_in_value_out_methods=event_stream_in_value_out_methods, + event_stream_in_stream_out_methods=event_stream_in_stream_out_methods, + multi_method=multi_method) + + front = _tickets_implementations.front( + front_work_pool, front_transmission_pool, front_utility_pool) + back = _tickets_implementations.back( + servicer, back_work_pool, back_transmission_pool, back_utility_pool, + default_timeout, _MAXIMUM_TIMEOUT) + front.join_rear_link(back) + back.join_fore_link(front) + + stub = implementations.stub(front, stub_pool) + + return _LinkedPair(implementations.server(), stub, front, back, pools) diff --git a/src/python/_framework/face/event_invocation_synchronous_event_service_test.py b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py new file mode 100644 index 0000000000..48e05b2478 --- /dev/null +++ b/src/python/_framework/face/event_invocation_synchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import event_invocation_synchronous_event_service_test_case as test_case + + +class EventInvocationSynchronousEventServiceTest( + _test_case.FaceTestCase, + test_case.EventInvocationSynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/exceptions.py b/src/python/_framework/face/exceptions.py new file mode 100644 index 0000000000..f112df70bc --- /dev/null +++ b/src/python/_framework/face/exceptions.py @@ -0,0 +1,77 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Exceptions used in the Face layer of RPC Framework.""" + +import abc + + +class NoSuchMethodError(Exception): + """Raised by customer code to indicate an unrecognized RPC method name. + + Attributes: + name: The unrecognized name. + """ + + def __init__(self, name): + """Constructor. + + Args: + name: The unrecognized RPC method name. + """ + super(NoSuchMethodError, self).__init__() + self.name = name + + +class RpcError(Exception): + """Common super type for all exceptions raised by the Face layer. + + Only RPC Framework should instantiate and raise these exceptions. + """ + __metaclass__ = abc.ABCMeta + + +class CancellationError(RpcError): + """Indicates that an RPC has been cancelled.""" + + +class ExpirationError(RpcError): + """Indicates that an RPC has expired ("timed out").""" + + +class NetworkError(RpcError): + """Indicates that some error occurred on the network.""" + + +class ServicedError(RpcError): + """Indicates that the Serviced failed in the course of an RPC.""" + + +class ServicerError(RpcError): + """Indicates that the Servicer failed in the course of servicing an RPC.""" diff --git a/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py new file mode 100644 index 0000000000..96f5fe85d3 --- /dev/null +++ b/src/python/_framework/face/future_invocation_asynchronous_event_service_test.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""One of the tests of the Face layer of RPC Framework.""" + +import unittest + +from _framework.face import _test_case +from _framework.face.testing import future_invocation_asynchronous_event_service_test_case as test_case + + +class FutureInvocationAsynchronousEventServiceTest( + _test_case.FaceTestCase, + test_case.FutureInvocationAsynchronousEventServiceTestCase, + unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/face/implementations.py b/src/python/_framework/face/implementations.py new file mode 100644 index 0000000000..94362e2007 --- /dev/null +++ b/src/python/_framework/face/implementations.py @@ -0,0 +1,246 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the Face layer of RPC Framework.""" + +from _framework.base import exceptions as _base_exceptions +from _framework.base import interfaces as base_interfaces +from _framework.face import _calls +from _framework.face import _service +from _framework.face import exceptions +from _framework.face import interfaces + + +class _BaseServicer(base_interfaces.Servicer): + + def __init__(self, methods, multi_method): + self._methods = methods + self._multi_method = multi_method + + def service(self, name, context, output_consumer): + method = self._methods.get(name, None) + if method is not None: + return method(output_consumer, context) + elif self._multi_method is not None: + try: + return self._multi_method.service(name, output_consumer, context) + except exceptions.NoSuchMethodError: + raise _base_exceptions.NoSuchMethodError() + else: + raise _base_exceptions.NoSuchMethodError() + + +class _Server(interfaces.Server): + """An interfaces.Server implementation.""" + + +class _Stub(interfaces.Stub): + """An interfaces.Stub implementation.""" + + def __init__(self, front, pool): + self._front = front + self._pool = pool + + def blocking_value_in_value_out(self, name, request, timeout): + return _calls.blocking_value_in_value_out( + self._front, name, request, timeout, 'unused trace ID') + + def future_value_in_value_out(self, name, request, timeout): + return _calls.future_value_in_value_out( + self._front, name, request, timeout, 'unused trace ID') + + def inline_value_in_stream_out(self, name, request, timeout): + return _calls.inline_value_in_stream_out( + self._front, name, request, timeout, 'unused trace ID') + + def blocking_stream_in_value_out(self, name, request_iterator, timeout): + return _calls.blocking_stream_in_value_out( + self._front, name, request_iterator, timeout, 'unused trace ID') + + def future_stream_in_value_out(self, name, request_iterator, timeout): + return _calls.future_stream_in_value_out( + self._front, name, request_iterator, timeout, 'unused trace ID', + self._pool) + + def inline_stream_in_stream_out(self, name, request_iterator, timeout): + return _calls.inline_stream_in_stream_out( + self._front, name, request_iterator, timeout, 'unused trace ID', + self._pool) + + def event_value_in_value_out( + self, name, request, response_callback, abortion_callback, timeout): + return _calls.event_value_in_value_out( + self._front, name, request, response_callback, abortion_callback, + timeout, 'unused trace ID') + + def event_value_in_stream_out( + self, name, request, response_consumer, abortion_callback, timeout): + return _calls.event_value_in_stream_out( + self._front, name, request, response_consumer, abortion_callback, + timeout, 'unused trace ID') + + def event_stream_in_value_out( + self, name, response_callback, abortion_callback, timeout): + return _calls.event_stream_in_value_out( + self._front, name, response_callback, abortion_callback, timeout, + 'unused trace ID') + + def event_stream_in_stream_out( + self, name, response_consumer, abortion_callback, timeout): + return _calls.event_stream_in_stream_out( + self._front, name, response_consumer, abortion_callback, timeout, + 'unused trace ID') + + +def _aggregate_methods( + pool, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods): + """Aggregates methods coded in according to different interfaces.""" + methods = {} + + def adapt_unpooled_methods(adapted_methods, unadapted_methods, adaptation): + if unadapted_methods is not None: + for name, unadapted_method in unadapted_methods.iteritems(): + adapted_methods[name] = adaptation(unadapted_method) + + def adapt_pooled_methods(adapted_methods, unadapted_methods, adaptation): + if unadapted_methods is not None: + for name, unadapted_method in unadapted_methods.iteritems(): + adapted_methods[name] = adaptation(unadapted_method, pool) + + adapt_unpooled_methods( + methods, inline_value_in_value_out_methods, + _service.adapt_inline_value_in_value_out) + adapt_unpooled_methods( + methods, inline_value_in_stream_out_methods, + _service.adapt_inline_value_in_stream_out) + adapt_pooled_methods( + methods, inline_stream_in_value_out_methods, + _service.adapt_inline_stream_in_value_out) + adapt_pooled_methods( + methods, inline_stream_in_stream_out_methods, + _service.adapt_inline_stream_in_stream_out) + adapt_unpooled_methods( + methods, event_value_in_value_out_methods, + _service.adapt_event_value_in_value_out) + adapt_unpooled_methods( + methods, event_value_in_stream_out_methods, + _service.adapt_event_value_in_stream_out) + adapt_unpooled_methods( + methods, event_stream_in_value_out_methods, + _service.adapt_event_stream_in_value_out) + adapt_unpooled_methods( + methods, event_stream_in_stream_out_methods, + _service.adapt_event_stream_in_stream_out) + + return methods + + +def servicer( + pool, + inline_value_in_value_out_methods=None, + inline_value_in_stream_out_methods=None, + inline_stream_in_value_out_methods=None, + inline_stream_in_stream_out_methods=None, + event_value_in_value_out_methods=None, + event_value_in_stream_out_methods=None, + event_stream_in_value_out_methods=None, + event_stream_in_stream_out_methods=None, + multi_method=None): + """Creates a base_interfaces.Servicer. + + The key sets of the passed dictionaries must be disjoint. It is guaranteed + that any passed MultiMethod implementation will only be called to service an + RPC if the RPC method name is not present in the key sets of the passed + dictionaries. + + Args: + pool: A thread pool. + inline_value_in_value_out_methods: A dictionary mapping method names to + interfaces.InlineValueInValueOutMethod implementations. + inline_value_in_stream_out_methods: A dictionary mapping method names to + interfaces.InlineValueInStreamOutMethod implementations. + inline_stream_in_value_out_methods: A dictionary mapping method names to + interfaces.InlineStreamInValueOutMethod implementations. + inline_stream_in_stream_out_methods: A dictionary mapping method names to + interfaces.InlineStreamInStreamOutMethod implementations. + event_value_in_value_out_methods: A dictionary mapping method names to + interfaces.EventValueInValueOutMethod implementations. + event_value_in_stream_out_methods: A dictionary mapping method names to + interfaces.EventValueInStreamOutMethod implementations. + event_stream_in_value_out_methods: A dictionary mapping method names to + interfaces.EventStreamInValueOutMethod implementations. + event_stream_in_stream_out_methods: A dictionary mapping method names to + interfaces.EventStreamInStreamOutMethod implementations. + multi_method: An implementation of interfaces.MultiMethod. + + Returns: + A base_interfaces.Servicer that services RPCs via the given implementations. + """ + methods = _aggregate_methods( + pool, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods) + + return _BaseServicer(methods, multi_method) + + +def server(): + """Creates an interfaces.Server. + + Returns: + An interfaces.Server. + """ + return _Server() + + +def stub(front, pool): + """Creates an interfaces.Stub. + + Args: + front: A base_interfaces.Front. + pool: A futures.ThreadPoolExecutor. + + Returns: + An interfaces.Stub that performs RPCs via the given base_interfaces.Front. + """ + return _Stub(front, pool) diff --git a/src/python/_framework/face/interfaces.py b/src/python/_framework/face/interfaces.py new file mode 100644 index 0000000000..0cc7c70df3 --- /dev/null +++ b/src/python/_framework/face/interfaces.py @@ -0,0 +1,545 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces for the face layer of RPC Framework.""" + +import abc + +# exceptions, abandonment, and future are referenced from specification in this +# module. +from _framework.face import exceptions # pylint: disable=unused-import +from _framework.foundation import abandonment # pylint: disable=unused-import +from _framework.foundation import future # pylint: disable=unused-import + + +class CancellableIterator(object): + """Implements the Iterator protocol and affords a cancel method.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __iter__(self): + """Returns the self object in accordance with the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def next(self): + """Returns a value or raises StopIteration per the Iterator protocol.""" + raise NotImplementedError() + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of whatever computation underlies this iterator.""" + raise NotImplementedError() + + +# Constants that categorize RPC abortion. +# TODO(nathaniel): Learn and use Python's enum library for this de facto +# enumerated type +CANCELLED = 'abortion: cancelled' +EXPIRED = 'abortion: expired' +NETWORK_FAILURE = 'abortion: network failure' +SERVICED_FAILURE = 'abortion: serviced failure' +SERVICER_FAILURE = 'abortion: servicer failure' + + +class RpcContext(object): + """Provides RPC-related information and action.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def is_active(self): + """Describes whether the RPC is active or has terminated.""" + raise NotImplementedError() + + @abc.abstractmethod + def time_remaining(self): + """Describes the length of allowed time remaining for the RPC. + + Returns: + A nonnegative float indicating the length of allowed time in seconds + remaining for the RPC to complete before it is considered to have timed + out. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_abortion_callback(self, abortion_callback): + """Registers a callback to be called if the RPC is aborted. + + Args: + abortion_callback: A callable to be called and passed one of CANCELLED, + EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the + event of RPC abortion. + """ + raise NotImplementedError() + + +class InlineValueInValueOutMethod(object): + """A type for inline unary-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, context): + """Services an RPC that accepts one value and produces one value. + + Args: + request: The single request value for the RPC. + context: An RpcContext object. + + Returns: + The single response value for the RPC. + + Raises: + abandonment.Abandoned: If no response is necessary because the RPC has + been aborted. + """ + raise NotImplementedError() + + +class InlineValueInStreamOutMethod(object): + """A type for inline unary-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, context): + """Services an RPC that accepts one value and produces a stream of values. + + Args: + request: The single request value for the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If completing the response stream is not necessary + because the RPC has been aborted. + """ + raise NotImplementedError() + + +class InlineStreamInValueOutMethod(object): + """A type for inline stream-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request_iterator, context): + """Services an RPC that accepts a stream of values and produces one value. + + Args: + request_iterator: An iterator that yields the request values of the RPC. + Drawing values from this iterator may also raise exceptions.RpcError to + indicate abortion of the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If no response is necessary because the RPC has + been aborted. + exceptions.RpcError: Implementations of this method must not deliberately + raise exceptions.RpcError but may allow such errors raised by the + request_iterator passed to them to propagate through their bodies + uncaught. + """ + raise NotImplementedError() + + +class InlineStreamInStreamOutMethod(object): + """A type for inline stream-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request_iterator, context): + """Services an RPC that accepts and produces streams of values. + + Args: + request_iterator: An iterator that yields the request values of the RPC. + Drawing values from this iterator may also raise exceptions.RpcError to + indicate abortion of the RPC. + context: An RpcContext object. + + Yields: + The values that comprise the response stream of the RPC. + + Raises: + abandonment.Abandoned: If completing the response stream is not necessary + because the RPC has been aborted. + exceptions.RpcError: Implementations of this method must not deliberately + raise exceptions.RpcError but may allow such errors raised by the + request_iterator passed to them to propagate through their bodies + uncaught. + """ + raise NotImplementedError() + + +class EventValueInValueOutMethod(object): + """A type for event-driven unary-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context): + """Services an RPC that accepts one value and produces one value. + + Args: + request: The single request value for the RPC. + response_callback: A callback to be called to accept the response value of + the RPC. + context: An RpcContext object. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventValueInStreamOutMethod(object): + """A type for event-driven unary-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context): + """Services an RPC that accepts one value and produces a stream of values. + + Args: + request: The single request value for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventStreamInValueOutMethod(object): + """A type for event-driven stream-request-unary-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context): + """Services an RPC that accepts a stream of values and produces one value. + + Args: + response_callback: A callback to be called to accept the response value of + the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class EventStreamInStreamOutMethod(object): + """A type for event-driven stream-request-stream-response RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context): + """Services an RPC that accepts and produces streams of values. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class MultiMethod(object): + """A general type able to service many RPC methods.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, name, response_consumer, context): + """Services an RPC. + + Args: + name: The RPC method name. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + context: An RpcContext object. + + Returns: + A stream.Consumer with which to accept the request values of the RPC. The + consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing values to this object. Implementations must not assume that this + object will be called to completion of the request stream or even called + at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + exceptions.NoSuchMethodError: If this MultiMethod does not recognize the + given RPC method name and is not able to service the RPC. + """ + raise NotImplementedError() + + +class Server(object): + """Specification of a running server that services RPCs.""" + __metaclass__ = abc.ABCMeta + + +class Call(object): + """Invocation-side representation of an RPC. + + Attributes: + context: An RpcContext affording information about the RPC. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cancel(self): + """Requests cancellation of the RPC.""" + raise NotImplementedError() + + +class Stub(object): + """Affords RPC methods to callers.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def blocking_value_in_value_out(self, name, request, timeout): + """Invokes a unary-request-unary-response RPC method. + + This method blocks until either returning the response value of the RPC + (in the event of RPC completion) or raising an exception (in the event of + RPC abortion). + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future_value_in_value_out(self, name, request, timeout): + """Invokes a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future will return an outcome indicating that the RPC returned + the response value of the RPC. In the event of RPC abortion, the + returned Future will return an outcome indicating that the RPC raised + an exceptions.RpcError. + """ + raise NotImplementedError() + + @abc.abstractmethod + def inline_value_in_stream_out(self, name, request, timeout): + """Invokes a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion of + the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def blocking_stream_in_value_out(self, name, request_iterator, timeout): + """Invokes a stream-request-unary-response RPC method. + + This method blocks until either returning the response value of the RPC + (in the event of RPC completion) or raising an exception (in the event of + RPC abortion). + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + The response value for the RPC. + + Raises: + exceptions.RpcError: Indicating that the RPC was aborted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def future_stream_in_value_out(self, name, request_iterator, timeout): + """Invokes a stream-request-unary-response RPC method. + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A future.Future representing the RPC. In the event of RPC completion, the + returned Future will return an outcome indicating that the RPC returned + the response value of the RPC. In the event of RPC abortion, the + returned Future will return an outcome indicating that the RPC raised + an exceptions.RpcError. + """ + raise NotImplementedError() + + @abc.abstractmethod + def inline_stream_in_stream_out(self, name, request_iterator, timeout): + """Invokes a stream-request-stream-response RPC method. + + Args: + name: The RPC method name. + request_iterator: An iterator that yields the request values of the RPC. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A CancellableIterator that yields the response values of the RPC and + affords RPC cancellation. Drawing response values from the returned + CancellableIterator may raise exceptions.RpcError indicating abortion of + the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_value_in_value_out( + self, name, request, response_callback, abortion_callback, timeout): + """Event-driven invocation of a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + response_callback: A callback to be called to accept the response value + of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A Call object for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_value_in_stream_out( + self, name, request, response_consumer, abortion_callback, timeout): + """Event-driven invocation of a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + request: The request value for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A Call object for the RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_stream_in_value_out( + self, name, response_callback, abortion_callback, timeout): + """Event-driven invocation of a unary-request-unary-response RPC method. + + Args: + name: The RPC method name. + response_callback: A callback to be called to accept the response value + of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def event_stream_in_stream_out( + self, name, response_consumer, abortion_callback, timeout): + """Event-driven invocation of a unary-request-stream-response RPC method. + + Args: + name: The RPC method name. + response_consumer: A stream.Consumer to be called to accept the response + values of the RPC. + abortion_callback: A callback to be called to accept one of CANCELLED, + EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC + abortion. + timeout: A duration of time in seconds to allow for the RPC. + + Returns: + A pair of a Call object for the RPC and a stream.Consumer to which the + request values of the RPC should be passed. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/__init__.py b/src/python/_framework/face/testing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/face/testing/__init__.py diff --git a/src/python/_framework/face/testing/base_util.py b/src/python/_framework/face/testing/base_util.py new file mode 100644 index 0000000000..d9ccb3af8f --- /dev/null +++ b/src/python/_framework/face/testing/base_util.py @@ -0,0 +1,102 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for creating Base-layer objects for use in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from _framework.base import util as _base_util +from _framework.base.packets import implementations +from _framework.base.packets import in_memory +from _framework.base.packets import interfaces # pylint: disable=unused-import +from _framework.foundation import logging_pool + +_POOL_SIZE_LIMIT = 20 + +_MAXIMUM_TIMEOUT = 90 + + +class LinkedPair(object): + """A Front and Back that are linked to one another. + + Attributes: + front: An interfaces.Front. + back: An interfaces.Back. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def shut_down(self): + """Shuts down this object and releases its resources.""" + raise NotImplementedError() + + +class _LinkedPair(LinkedPair): + + def __init__(self, front, back, pools): + self.front = front + self.back = back + self._pools = pools + + def shut_down(self): + _base_util.wait_for_idle(self.front) + _base_util.wait_for_idle(self.back) + + for pool in self._pools: + pool.shutdown(wait=True) + + +def linked_pair(servicer, default_timeout): + """Creates a Server and Stub linked together for use.""" + link_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + pools = ( + link_pool, + front_work_pool, front_transmission_pool, front_utility_pool, + back_work_pool, back_transmission_pool, back_utility_pool) + + link = in_memory.Link(link_pool) + front = implementations.front( + front_work_pool, front_transmission_pool, front_utility_pool) + back = implementations.back( + servicer, back_work_pool, back_transmission_pool, back_utility_pool, + default_timeout, _MAXIMUM_TIMEOUT) + front.join_rear_link(link) + link.join_fore_link(front) + back.join_fore_link(link) + link.join_rear_link(back) + + return _LinkedPair(front, back, pools) diff --git a/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py new file mode 100644 index 0000000000..0b1a2f0bd2 --- /dev/null +++ b/src/python/_framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -0,0 +1,223 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +# unittest is referenced from specification in this module. +import abc +import unittest # pylint: disable=unused-import + +from _framework.face import exceptions +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case + +_TIMEOUT = 3 + + +class BlockingInvocationInlineServiceTestCase( + test_case.FaceTestCase, coverage.BlockingCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + self.digest.inline_unary_unary_methods, + self.digest.inline_unary_stream_methods, + self.digest.inline_stream_unary_methods, + self.digest.inline_stream_stream_methods, + {}, {}, {}, {}, None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response = self.stub.blocking_value_in_value_out( + name, request, _TIMEOUT) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response = self.stub.blocking_stream_in_value_out( + name, iter(requests), _TIMEOUT) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self.stub.blocking_value_in_value_out( + name, first_request, _TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self.stub.blocking_value_in_value_out( + name, second_request, _TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_value_in_value_out(name, request, _TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_stream_in_value_out(name, iter(requests), _TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) diff --git a/src/python/_framework/face/testing/callback.py b/src/python/_framework/face/testing/callback.py new file mode 100644 index 0000000000..7a20869abe --- /dev/null +++ b/src/python/_framework/face/testing/callback.py @@ -0,0 +1,94 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A utility useful in tests of asynchronous, event-driven interfaces.""" + +import threading + +from _framework.foundation import stream + + +class Callback(stream.Consumer): + """A utility object useful in tests of asynchronous code.""" + + def __init__(self): + self._condition = threading.Condition() + self._unary_response = None + self._streamed_responses = [] + self._completed = False + self._abortion = None + + def abort(self, abortion): + with self._condition: + self._abortion = abortion + self._condition.notify_all() + + def complete(self, unary_response): + with self._condition: + self._unary_response = unary_response + self._completed = True + self._condition.notify_all() + + def consume(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + + def terminate(self): + with self._condition: + self._completed = True + self._condition.notify_all() + + def consume_and_terminate(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + self._completed = True + self._condition.notify_all() + + def block_until_terminated(self): + with self._condition: + while self._abortion is None and not self._completed: + self._condition.wait() + + def response(self): + with self._condition: + if self._abortion is None: + return self._unary_response + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def responses(self): + with self._condition: + if self._abortion is None: + return list(self._streamed_responses) + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def abortion(self): + with self._condition: + return self._abortion diff --git a/src/python/_framework/face/testing/control.py b/src/python/_framework/face/testing/control.py new file mode 100644 index 0000000000..3960c4e649 --- /dev/null +++ b/src/python/_framework/face/testing/control.py @@ -0,0 +1,87 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for instructing systems under test to block or fail.""" + +import abc +import contextlib +import threading + + +class Control(object): + """An object that accepts program control from a system under test. + + Systems under test passed a Control should call its control() method + frequently during execution. The control() method may block, raise an + exception, or do nothing, all according to the enclosing test's desire for + the system under test to simulate hanging, failing, or functioning. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def control(self): + """Potentially does anything.""" + raise NotImplementedError() + + +class PauseFailControl(Control): + """A Control that can be used to pause or fail code under control.""" + + def __init__(self): + self._condition = threading.Condition() + self._paused = False + self._fail = False + + def control(self): + with self._condition: + if self._fail: + raise ValueError() + + while self._paused: + self._condition.wait() + + @contextlib.contextmanager + def pause(self): + """Pauses code under control while controlling code is in context.""" + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): + """Fails code under control while controlling code is in context.""" + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False diff --git a/src/python/_framework/face/testing/coverage.py b/src/python/_framework/face/testing/coverage.py new file mode 100644 index 0000000000..f3aca113fe --- /dev/null +++ b/src/python/_framework/face/testing/coverage.py @@ -0,0 +1,123 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Governs coverage for the tests of the Face layer of RPC Framework.""" + +import abc + +# These classes are only valid when inherited by unittest.TestCases. +# pylint: disable=invalid-name + + +class BlockingCoverage(object): + """Specification of test coverage for blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testSuccessfulUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSequentialInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestStreamResponse(self): + raise NotImplementedError() + + +class FullCoverage(BlockingCoverage): + """Specification of test coverage for non-blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/digest.py b/src/python/_framework/face/testing/digest.py new file mode 100644 index 0000000000..8d1291c975 --- /dev/null +++ b/src/python/_framework/face/testing/digest.py @@ -0,0 +1,446 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for making a service.TestService more amenable to use in tests.""" + +import collections +import threading + +# testing_control, interfaces, and testing_service are referenced from +# specification in this module. +from _framework.face import exceptions +from _framework.face import interfaces as face_interfaces +from _framework.face.testing import control as testing_control # pylint: disable=unused-import +from _framework.face.testing import interfaces # pylint: disable=unused-import +from _framework.face.testing import service as testing_service # pylint: disable=unused-import +from _framework.foundation import stream +from _framework.foundation import stream_util + +_IDENTITY = lambda x: x + + +class TestServiceDigest( + collections.namedtuple( + 'TestServiceDigest', + ['name', + 'methods', + 'inline_unary_unary_methods', + 'inline_unary_stream_methods', + 'inline_stream_unary_methods', + 'inline_stream_stream_methods', + 'event_unary_unary_methods', + 'event_unary_stream_methods', + 'event_stream_unary_methods', + 'event_stream_stream_methods', + 'multi_method', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences'])): + """A transformation of a service.TestService. + + Attributes: + name: The RPC service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + inline_unary_unary_methods: A dict from method name to + face_interfaces.InlineValueInValueOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_unary_stream_methods: A dict from method name to + face_interfaces.InlineValueInStreamOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_stream_unary_methods: A dict from method name to + face_interfaces.InlineStreamInValueOutMethod object to be used in tests of + in-line calls to behaviors under test. + inline_stream_stream_methods: A dict from method name to + face_interfaces.InlineStreamInStreamOutMethod object to be used in tests + of in-line calls to behaviors under test. + event_unary_unary_methods: A dict from method name to + face_interfaces.EventValueInValueOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_unary_stream_methods: A dict from method name to + face_interfaces.EventValueInStreamOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_stream_unary_methods: A dict from method name to + face_interfaces.EventStreamInValueOutMethod object to be used in tests of + event-driven calls to behaviors under test. + event_stream_stream_methods: A dict from method name to + face_interfaces.EventStreamInStreamOutMethod object to be used in tests of + event-driven calls to behaviors under test. + multi_method: A face_interfaces.MultiMethod to be used in tests of generic + calls to behaviors under test. + unary_unary_messages_sequences: A dict from method name to sequence of + service.UnaryUnaryTestMessages objects to be used to test the method + with the given name. + unary_stream_messages_sequences: A dict from method name to sequence of + service.UnaryStreamTestMessages objects to be used to test the method + with the given name. + stream_unary_messages_sequences: A dict from method name to sequence of + service.StreamUnaryTestMessages objects to be used to test the method + with the given name. + stream_stream_messages_sequences: A dict from method name to sequence of + service.StreamStreamTestMessages objects to be used to test the + method with the given name. + serialization: A serial.Serialization object describing serialization + behaviors for all the RPC methods. + """ + + +class _BufferingConsumer(stream.Consumer): + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + + def __init__(self): + self.consumed = [] + self.terminated = False + + def consume(self, value): + self.consumed.append(value) + + def terminate(self): + self.terminated = True + + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True + + +class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): + + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control + + def service(self, request, context): + response_list = [] + self._test_method.service( + request, response_list.append, context, self._control) + return response_list.pop(0) + + +class _EventUnaryUnaryMethod(face_interfaces.EventValueInValueOutMethod): + + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool + + def service(self, request, response_callback, context): + if self._pool is None: + self._test_method.service( + request, response_callback, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_callback, context, + self._control) + + +class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): + + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control + + def service(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service( + request, response_consumer, context, self._control) + for response in response_consumer.consumed: + yield response + + +class _EventUnaryStreamMethod(face_interfaces.EventValueInStreamOutMethod): + + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool + + def service(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service( + request, response_consumer, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_consumer, context, + self._control) + + +class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): + + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control + + def service(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service( + response_list.append, context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) + + +class _EventStreamUnaryMethod(face_interfaces.EventStreamInValueOutMethod): + + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool + + def service(self, response_callback, context): + request_consumer = self._test_method.service( + response_callback, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): + + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control + + def service(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service( + response_consumer, context, self._control) + + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() + + +class _EventStreamStreamMethod(face_interfaces.EventStreamInStreamOutMethod): + + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool + + def service(self, response_consumer, context): + request_consumer = self._test_method.service( + response_consumer, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _UnaryConsumer(stream.Consumer): + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) + + +class _UnaryUnaryAdaptation(object): + + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service( + request, response_consumer.consume_and_terminate, context, control) + return _UnaryConsumer(action) + + +class _UnaryStreamAdaptation(object): + + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service(request, response_consumer, context, control) + return _UnaryConsumer(action) + + +class _StreamUnaryAdaptation(object): + + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method + + def service(self, response_consumer, context, control): + return self._method.service( + response_consumer.consume_and_terminate, context, control) + + +class _MultiMethod(face_interfaces.MultiMethod): + + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool + + def service(self, name, response_consumer, context): + method = self._methods.get(name, None) + if method is None: + raise exceptions.NoSuchMethodError(name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _Assembly( + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble( + scenarios, names, inline_method_constructor, event_method_constructor, + adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = [] + inlines = {} + events = {} + adaptations = {} + messages = {} + for name, scenario in scenarios.iteritems(): + if name in names: + raise ValueError('Repeated name "%s"!' % name) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods.append(test_method) + inlines[name] = inline_method + events[name] = event_method + adaptations[name] = adaptation + messages[name] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) + + +def digest(service, control, pool): + """Creates a TestServiceDigest from a TestService. + + Args: + service: A testing_service.TestService. + control: A testing_control.Control. + pool: If RPC methods should be serviced in a separate thread, a thread pool. + None if RPC methods should be serviced in the thread belonging to the + run-time that calls for their service. + + Returns: + A TestServiceDigest synthesized from the given service.TestService. + """ + names = set() + + unary_unary = _assemble( + service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod, + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) + names.update(set(unary_unary.inlines)) + + unary_stream = _assemble( + service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod, + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) + names.update(set(unary_stream.inlines)) + + stream_unary = _assemble( + service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod, + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) + names.update(set(stream_unary.inlines)) + + stream_stream = _assemble( + service.stream_stream_scenarios(), names, _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, pool) + names.update(set(stream_stream.inlines)) + + methods = list(unary_unary.methods) + methods.extend(unary_stream.methods) + methods.extend(stream_unary.methods) + methods.extend(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + + return TestServiceDigest( + service.name(), + methods, + unary_unary.inlines, + unary_stream.inlines, + stream_unary.inlines, + stream_stream.inlines, + unary_unary.events, + unary_stream.events, + stream_unary.events, + stream_stream.events, + _MultiMethod(adaptations, control, pool), + unary_unary.messages, + unary_stream.messages, + stream_unary.messages, + stream_stream.messages) diff --git a/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py new file mode 100644 index 0000000000..dba73a9368 --- /dev/null +++ b/src/python/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -0,0 +1,367 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import unittest + +from _framework.face import interfaces +from _framework.face.testing import callback as testing_callback +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case + +_TIMEOUT = 3 + + +class EventInvocationSynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + {}, {}, {}, {}, + self.digest.event_unary_unary_methods, + self.digest.event_unary_stream_methods, + self.digest.event_stream_unary_methods, + self.digest.event_stream_stream_methods, + None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + # pylint: disable=cell-var-from-loop + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + first_callback = testing_callback.Callback() + second_callback = testing_callback.Callback() + + def make_second_invocation(first_response): + first_callback.complete(first_response) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, _TIMEOUT) + + self.stub.event_value_in_value_out( + name, first_request, make_second_invocation, first_callback.abort, + _TIMEOUT) + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + callback.block_until_terminated() + + self.assertEqual(interfaces.EXPIRED, callback.abortion()) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + first_callback = testing_callback.Callback() + second_request = test_messages.request() + second_callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, first_request, first_callback.complete, first_callback.abort, + _TIMEOUT) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, _TIMEOUT) + first_callback.block_until_terminated() + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + call = self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + call = self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, _TIMEOUT) + for request in requests: + request_consumer.consume(request) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + call, unused_request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, _TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.CANCELLED, callback.abortion()) diff --git a/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py new file mode 100644 index 0000000000..cf8b2eeb95 --- /dev/null +++ b/src/python/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -0,0 +1,377 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import contextlib +import threading +import unittest + +from _framework.face import exceptions +from _framework.face.testing import control +from _framework.face.testing import coverage +from _framework.face.testing import digest +from _framework.face.testing import stock_service +from _framework.face.testing import test_case +from _framework.foundation import future +from _framework.foundation import logging_pool + +_TIMEOUT = 3 +_MAXIMUM_POOL_SIZE = 100 + + +class _PauseableIterator(object): + + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False + + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) + + +class FutureInvocationAsynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool) + + self.server, self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + {}, {}, {}, {}, + self.digest.event_unary_unary_methods, + self.digest.event_unary_stream_methods, + self.digest.event_stream_unary_methods, + self.digest.event_stream_stream_methods, + None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + self.digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + response = response_future.outcome().return_value + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self.stub.future_stream_in_value_out( + name, request_iterator, _TIMEOUT) + response = response_future.outcome().return_value + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, request_iterator, _TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self.stub.future_value_in_value_out( + name, first_request, _TIMEOUT) + first_response = first_response_future.outcome().return_value + + test_messages.verify(first_request, first_response, self) + + second_response_future = self.stub.future_value_in_value_out( + name, second_request, _TIMEOUT) + second_response = second_response_future.outcome().return_value + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + outcome = response_future.outcome() + + self.assertIsInstance( + outcome.exception, exceptions.ExpirationError) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + outcome = response_future.outcome() + + self.assertIsInstance( + outcome.exception, exceptions.ExpirationError) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + outcome = response_future.outcome() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_callback before the + # expiration of the RPC. + self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + outcome = response_future.outcome() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_callback before the + # expiration of the RPC. + self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + list(response_iterator) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self.stub.future_value_in_value_out( + name, first_request, _TIMEOUT) + second_response_future = self.stub.future_value_in_value_out( + name, second_request, _TIMEOUT) + first_response = first_response_future.outcome().return_value + second_response = second_response_future.outcome().return_value + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_future = self.stub.future_value_in_value_out( + name, request, _TIMEOUT) + cancelled = response_future.cancel() + + self.assertFalse(cancelled) + self.assertEqual(future.ABORTED, response_future.outcome().category) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, _TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(exceptions.CancellationError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), _TIMEOUT) + cancelled = response_future.cancel() + + self.assertFalse(cancelled) + self.assertEqual(future.ABORTED, response_future.outcome().category) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), _TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(exceptions.CancellationError): + next(response_iterator) diff --git a/src/python/_framework/face/testing/interfaces.py b/src/python/_framework/face/testing/interfaces.py new file mode 100644 index 0000000000..253f6f118d --- /dev/null +++ b/src/python/_framework/face/testing/interfaces.py @@ -0,0 +1,117 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# cardinality is referenced from specification in this module. +from _framework.common import cardinality # pylint: disable=unused-import + + +class Method(object): + """An RPC method to be used in tests of RPC implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identify the name of the method. + + Returns: + The name of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. + + Returns: + A cardinality.Cardinality value describing the streaming semantics of the + method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. + + Returns: + The class object of the class to which the method's request objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. + + Returns: + The class object of the class to which the method's response objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. + + Args: + request: A request object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. + + Args: + serialized_request: A bytestring deserializable into a request object + appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. + + Args: + response: A response object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. + + Args: + serialized_response: A bytestring deserializable into a response object + appropriate for this method. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/serial.py b/src/python/_framework/face/testing/serial.py new file mode 100644 index 0000000000..47fc5822de --- /dev/null +++ b/src/python/_framework/face/testing/serial.py @@ -0,0 +1,70 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utility for serialization in the context of test RPC services.""" + +import collections + + +class Serialization( + collections.namedtuple( + '_Serialization', + ['request_serializers', + 'request_deserializers', + 'response_serializers', + 'response_deserializers'])): + """An aggregation of serialization behaviors for an RPC service. + + Attributes: + request_serializers: A dict from method name to request object serializer + behavior. + request_deserializers: A dict from method name to request object + deserializer behavior. + response_serializers: A dict from method name to response object serializer + behavior. + response_deserializers: A dict from method name to response object + deserializer behavior. + """ + + +def serialization(methods): + """Creates a Serialization from a sequences of interfaces.Method objects.""" + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for method in methods: + name = method.name() + request_serializers[name] = method.serialize_request + request_deserializers[name] = method.deserialize_request + response_serializers[name] = method.serialize_response + response_deserializers[name] = method.deserialize_response + return Serialization( + request_serializers, request_deserializers, response_serializers, + response_deserializers) diff --git a/src/python/_framework/face/testing/service.py b/src/python/_framework/face/testing/service.py new file mode 100644 index 0000000000..771346ec2e --- /dev/null +++ b/src/python/_framework/face/testing/service.py @@ -0,0 +1,337 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from _framework.face.testing import interfaces + + +class UnaryUnaryTestMethod(interfaces.Method): + """Like face_interfaces.EventValueInValueOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. + + Args: + request: The single request message for the RPC. + response_callback: A callback to be called to accept the response message + of the RPC. + context: An face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryUnaryTestMessages(object): + """A type for unary-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. + + Args: + request: A request message. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class UnaryStreamTestMethod(interfaces.Method): + """Like face_interfaces.EventValueInStreamOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. + + Args: + request: The single request message for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryStreamTestMessages(object): + """A type for unary-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. + + Args: + request: A request message. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and responses do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamUnaryTestMethod(interfaces.Method): + """Like face_interfaces.EventStreamInValueOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. + + Args: + response_callback: A callback to be called to accept the response message + of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamUnaryTestMessages(object): + """A type for stream-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamStreamTestMethod(interfaces.Method): + """Like face_interfaces.EventStreamInStreamOutMethod but with a control.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: An RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamStreamTestMessages(object): + """A type for stream-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and responses do not match, indicating + that there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class TestService(object): + """A specification of implemented RPC methods to use in tests.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identifies the RPC service name used during the test. + + Returns: + The RPC service name to be used for the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair + is a UnaryUnaryTestMethod object and the second element is a sequence + of UnaryUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + UnaryStreamTestMethod object and the second element is a sequence of + UnaryStreamTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamUnaryTestMethod object and the second element is a sequence of + StreamUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamStreamTestMethod object and the second element is a sequence of + StreamStreamTestMethodMessages objects. + """ + raise NotImplementedError() diff --git a/src/python/_framework/face/testing/stock_service.py b/src/python/_framework/face/testing/stock_service.py new file mode 100644 index 0000000000..bd82877e83 --- /dev/null +++ b/src/python/_framework/face/testing/stock_service.py @@ -0,0 +1,374 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Examples of Python implementations of the stock.proto Stock service.""" + +from _framework.common import cardinality +from _framework.face.testing import service +from _framework.foundation import abandonment +from _framework.foundation import stream +from _framework.foundation import stream_util +from _junkdrawer import stock_pb2 + +SYMBOL_FORMAT = 'test symbol:%03d' +STREAM_LENGTH = 400 + +# A test-appropriate security-pricing function. :-P +_price = lambda symbol_name: float(hash(symbol_name) % 4096) + + +def _get_last_trade_price(stock_request, stock_reply_callback, control, active): + """A unary-request, unary-response test method.""" + control.control() + if active(): + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol))) + else: + raise abandonment.Abandoned() + + +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() + return stream_util.TransformingConsumer( + stock_reply_for_stock_request, stock_reply_consumer) + + +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() + + +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply(symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() + + +class GetLastTradePrice(service.UnaryUnaryTestMethod): + """GetLastTradePrice for use in tests.""" + + def name(self): + return 'GetLastTradePrice' + + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_callback, context, control): + _get_last_trade_price( + request, response_callback, control, context.is_active) + + +class GetLastTradePriceMessages(service.UnaryUnaryTestMessages): + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) + + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) + + +class GetLastTradePriceMultiple(service.StreamStreamTestMethod): + """GetLastTradePriceMultiple for use in tests.""" + + def name(self): + return 'GetLastTradePriceMultiple' + + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple( + response_consumer, control, context.is_active) + + +class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages): + """Pairs of message streams for use with GetLastTradePriceMultiple.""" + + def __init__(self): + self._index = 0 + + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index)) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + + +class WatchFutureTrades(service.UnaryStreamTestMethod): + """WatchFutureTrades for use in tests.""" + + def name(self): + return 'WatchFutureTrades' + + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, context.is_active) + + +class WatchFutureTradesMessages(service.UnaryStreamTestMessages): + """Pairs of a single request message and a sequence of response messages.""" + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=STREAM_LENGTH) + + def verify(self, request, responses, test_case): + test_case.assertEqual(STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) + + +class GetHighestTradePrice(service.StreamUnaryTestMethod): + """GetHighestTradePrice for use in tests.""" + + def name(self): + return 'GetHighestTradePrice' + + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_callback, context, control): + return _get_highest_trade_price( + response_callback, control, context.is_active) + + +class GetHighestTradePriceMessages(service.StreamUnaryTestMessages): + + def requests(self): + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) + + +class StockTestService(service.TestService): + """A corpus of test data with one method of each RPC cardinality.""" + + def name(self): + return 'Stock' + + def unary_unary_scenarios(self): + return { + 'GetLastTradePrice': ( + GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + 'WatchFutureTrades': ( + WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + 'GetHighestTradePrice': ( + GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + 'GetLastTradePriceMultiple': ( + GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), + } + + +STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/_framework/face/testing/test_case.py b/src/python/_framework/face/testing/test_case.py new file mode 100644 index 0000000000..09b5a67f5a --- /dev/null +++ b/src/python/_framework/face/testing/test_case.py @@ -0,0 +1,111 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tools for creating tests of implementations of the Face layer.""" + +import abc + +# face_interfaces and interfaces are referenced in specification in this module. +from _framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from _framework.face.testing import interfaces # pylint: disable=unused-import + + +class FaceTestCase(object): + """Describes a test of the Face Layer of RPC Framework. + + Concrete subclasses must also inherit from unittest.TestCase and from at least + one class that defines test methods. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_up_implementation( + self, + name, + methods, + inline_value_in_value_out_methods, + inline_value_in_stream_out_methods, + inline_stream_in_value_out_methods, + inline_stream_in_stream_out_methods, + event_value_in_value_out_methods, + event_value_in_stream_out_methods, + event_stream_in_value_out_methods, + event_stream_in_stream_out_methods, + multi_method): + """Instantiates the Face Layer implementation under test. + + Args: + name: The service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + inline_value_in_value_out_methods: A dictionary from string method names + to face_interfaces.InlineValueInValueOutMethod implementations of those + methods. + inline_value_in_stream_out_methods: A dictionary from string method names + to face_interfaces.InlineValueInStreamOutMethod implementations of those + methods. + inline_stream_in_value_out_methods: A dictionary from string method names + to face_interfaces.InlineStreamInValueOutMethod implementations of those + methods. + inline_stream_in_stream_out_methods: A dictionary from string method names + to face_interfaces.InlineStreamInStreamOutMethod implementations of + those methods. + event_value_in_value_out_methods: A dictionary from string method names + to face_interfaces.EventValueInValueOutMethod implementations of those + methods. + event_value_in_stream_out_methods: A dictionary from string method names + to face_interfaces.EventValueInStreamOutMethod implementations of those + methods. + event_stream_in_value_out_methods: A dictionary from string method names + to face_interfaces.EventStreamInValueOutMethod implementations of those + methods. + event_stream_in_stream_out_methods: A dictionary from string method names + to face_interfaces.EventStreamInStreamOutMethod implementations of those + methods. + multi_method: An face_interfaces.MultiMethod, or None. + + Returns: + A sequence of length three the first element of which is a + face_interfaces.Server, the second element of which is a + face_interfaces.Stub, (both of which are backed by the given method + implementations), and the third element of which is an arbitrary memo + object to be kept and passed to tearDownImplementation at the conclusion + of the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def tear_down_implementation(self, memo): + """Destroys the Face layer implementation under test. + + Args: + memo: The object from the third position of the return value of + set_up_implementation. + """ + raise NotImplementedError() diff --git a/src/python/_framework/foundation/__init__.py b/src/python/_framework/foundation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_framework/foundation/__init__.py diff --git a/src/python/_framework/foundation/_later_test.py b/src/python/_framework/foundation/_later_test.py new file mode 100644 index 0000000000..fbd17a4ad9 --- /dev/null +++ b/src/python/_framework/foundation/_later_test.py @@ -0,0 +1,145 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of the later module.""" + +import threading +import time +import unittest + +from _framework.foundation import future +from _framework.foundation import later + +TICK = 0.1 + + +class LaterTest(unittest.TestCase): + + def test_simple_delay(self): + lock = threading.Lock() + cell = [0] + def increment_cell(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, increment_cell) + self.assertFalse(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + time.sleep(TICK) + self.assertFalse(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + with lock: + self.assertEqual(0, cell[0]) + time.sleep(TICK * 2) + self.assertTrue(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + with lock: + self.assertEqual(1, cell[0]) + outcome = computation_future.outcome() + self.assertEqual(future.RETURNED, outcome.category) + + def test_callback(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + outcome_passed_to_callback = [None] + def increment_cell(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, increment_cell) + def callback(outcome): + with lock: + callback_called[0] = True + outcome_passed_to_callback[0] = outcome + computation_future.add_done_callback(callback) + time.sleep(TICK) + with lock: + self.assertFalse(callback_called[0]) + time.sleep(TICK * 2) + with lock: + self.assertTrue(callback_called[0]) + self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + + callback_called[0] = False + outcome_passed_to_callback[0] = None + + computation_future.add_done_callback(callback) + with lock: + self.assertTrue(callback_called[0]) + self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + + def test_cancel(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + outcome_passed_to_callback = [None] + def increment_cell(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, increment_cell) + def callback(outcome): + with lock: + callback_called[0] = True + outcome_passed_to_callback[0] = outcome + computation_future.add_done_callback(callback) + time.sleep(TICK) + with lock: + self.assertFalse(callback_called[0]) + computation_future.cancel() + self.assertTrue(computation_future.cancelled()) + self.assertFalse(computation_future.done()) + self.assertEqual(future.ABORTED, computation_future.outcome().category) + with lock: + self.assertTrue(callback_called[0]) + self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category) + + def test_outcome(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + outcome_passed_to_callback = [None] + def increment_cell(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, increment_cell) + def callback(outcome): + with lock: + callback_called[0] = True + outcome_passed_to_callback[0] = outcome + computation_future.add_done_callback(callback) + returned_outcome = computation_future.outcome() + self.assertEqual(future.RETURNED, returned_outcome.category) + + # The callback may not yet have been called! Sleep a tick. + time.sleep(TICK) + with lock: + self.assertTrue(callback_called[0]) + self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/foundation/_logging_pool_test.py b/src/python/_framework/foundation/_logging_pool_test.py new file mode 100644 index 0000000000..f2224d80e5 --- /dev/null +++ b/src/python/_framework/foundation/_logging_pool_test.py @@ -0,0 +1,64 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests for _framework.foundation.logging_pool.""" + +import unittest + +from _framework.foundation import logging_pool + +_POOL_SIZE = 16 + + +class LoggingPoolTest(unittest.TestCase): + + def testUpAndDown(self): + pool = logging_pool.pool(_POOL_SIZE) + pool.shutdown(wait=True) + + with logging_pool.pool(_POOL_SIZE) as pool: + self.assertIsNotNone(pool) + + def testTaskExecuted(self): + test_list = [] + + with logging_pool.pool(_POOL_SIZE) as pool: + pool.submit(lambda: test_list.append(object())).result() + + self.assertTrue(test_list) + + def testException(self): + with logging_pool.pool(_POOL_SIZE) as pool: + raised_exception = pool.submit(lambda: 1/0).exception() + + self.assertIsNotNone(raised_exception) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/python/_framework/foundation/_timer_future.py b/src/python/_framework/foundation/_timer_future.py new file mode 100644 index 0000000000..86bc073d56 --- /dev/null +++ b/src/python/_framework/foundation/_timer_future.py @@ -0,0 +1,156 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Affords a Future implementation based on Python's threading.Timer.""" + +import threading +import time + +from _framework.foundation import future + + +class TimerFuture(future.Future): + """A Future implementation based around Timer objects.""" + + def __init__(self, compute_time, computation): + """Constructor. + + Args: + compute_time: The time after which to begin this future's computation. + computation: The computation to be performed within this Future. + """ + self._lock = threading.Lock() + self._compute_time = compute_time + self._computation = computation + self._timer = None + self._computing = False + self._computed = False + self._cancelled = False + self._outcome = None + self._waiting = [] + + def _compute(self): + """Performs the computation embedded in this Future. + + Or doesn't, if the time to perform it has not yet arrived. + """ + with self._lock: + time_remaining = self._compute_time - time.time() + if 0 < time_remaining: + self._timer = threading.Timer(time_remaining, self._compute) + self._timer.start() + return + else: + self._computing = True + + try: + returned_value = self._computation() + outcome = future.returned(returned_value) + except Exception as e: # pylint: disable=broad-except + outcome = future.raised(e) + + with self._lock: + self._computing = False + self._computed = True + self._outcome = outcome + waiting = self._waiting + + for callback in waiting: + callback(outcome) + + def start(self): + """Starts this Future. + + This must be called exactly once, immediately after construction. + """ + with self._lock: + self._timer = threading.Timer( + self._compute_time - time.time(), self._compute) + self._timer.start() + + def cancel(self): + """See future.Future.cancel for specification.""" + with self._lock: + if self._computing or self._computed: + return False + elif self._cancelled: + return True + else: + self._timer.cancel() + self._cancelled = True + self._outcome = future.aborted() + outcome = self._outcome + waiting = self._waiting + + for callback in waiting: + try: + callback(outcome) + except Exception: # pylint: disable=broad-except + pass + + return True + + def cancelled(self): + """See future.Future.cancelled for specification.""" + with self._lock: + return self._cancelled + + def done(self): + """See future.Future.done for specification.""" + with self._lock: + return self._computed + + def outcome(self): + """See future.Future.outcome for specification.""" + with self._lock: + if self._computed or self._cancelled: + return self._outcome + + condition = threading.Condition() + def notify_condition(unused_outcome): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait() + + with self._lock: + return self._outcome + + def add_done_callback(self, callback): + """See future.Future.add_done_callback for specification.""" + with self._lock: + if not self._computed and not self._cancelled: + self._waiting.append(callback) + return + else: + outcome = self._outcome + + callback(outcome) diff --git a/src/python/_framework/foundation/abandonment.py b/src/python/_framework/foundation/abandonment.py new file mode 100644 index 0000000000..960b4d06b4 --- /dev/null +++ b/src/python/_framework/foundation/abandonment.py @@ -0,0 +1,38 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for indicating abandonment of computation.""" + + +class Abandoned(Exception): + """Indicates that some computation is being abandoned. + + Abandoning a computation is different than returning a value or raising + an exception indicating some operational or programming defect. + """ diff --git a/src/python/_framework/foundation/callable_util.py b/src/python/_framework/foundation/callable_util.py new file mode 100644 index 0000000000..1f7546cb76 --- /dev/null +++ b/src/python/_framework/foundation/callable_util.py @@ -0,0 +1,78 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for working with callables.""" + +import functools +import logging + +from _framework.foundation import future + + +def _call_logging_exceptions(behavior, message, *args, **kwargs): + try: + return future.returned(behavior(*args, **kwargs)) + except Exception as e: # pylint: disable=broad-except + logging.exception(message) + return future.raised(e) + + +def with_exceptions_logged(behavior, message): + """Wraps a callable in a try-except that logs any exceptions it raises. + + Args: + behavior: Any callable. + message: A string to log if the behavior raises an exception. + + Returns: + A callable that when executed invokes the given behavior. The returned + callable takes the same arguments as the given behavior but returns a + future.Outcome describing whether the given behavior returned a value or + raised an exception. + """ + @functools.wraps(behavior) + def wrapped_behavior(*args, **kwargs): + return _call_logging_exceptions(behavior, message, *args, **kwargs) + return wrapped_behavior + + +def call_logging_exceptions(behavior, message, *args, **kwargs): + """Calls a behavior in a try-except that logs any exceptions it raises. + + Args: + behavior: Any callable. + message: A string to log if the behavior raises an exception. + *args: Positional arguments to pass to the given behavior. + **kwargs: Keyword arguments to pass to the given behavior. + + Returns: + A future.Outcome describing whether the given behavior returned a value or + raised an exception. + """ + return _call_logging_exceptions(behavior, message, *args, **kwargs) diff --git a/src/python/_framework/foundation/future.py b/src/python/_framework/foundation/future.py new file mode 100644 index 0000000000..f00c503257 --- /dev/null +++ b/src/python/_framework/foundation/future.py @@ -0,0 +1,172 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The Future interface missing from Python's standard library. + +Python's concurrent.futures library defines a Future class very much like the +Future defined here, but since that class is concrete and without construction +semantics it is only available within the concurrent.futures library itself. +The Future class defined here is an entirely abstract interface that anyone may +implement and use. +""" + +import abc +import collections + +RETURNED = object() +RAISED = object() +ABORTED = object() + + +class Outcome(object): + """A sum type describing the outcome of some computation. + + Attributes: + category: One of RETURNED, RAISED, or ABORTED, respectively indicating + that the computation returned a value, raised an exception, or was + aborted. + return_value: The value returned by the computation. Must be present if + category is RETURNED. + exception: The exception raised by the computation. Must be present if + category is RAISED. + """ + __metaclass__ = abc.ABCMeta + + +class _EasyOutcome( + collections.namedtuple('_EasyOutcome', + ['category', 'return_value', 'exception']), + Outcome): + """A trivial implementation of Outcome.""" + +# All Outcomes describing abortion are indistinguishable so there might as well +# be only one. +_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None) + + +def aborted(): + """Returns an Outcome indicating that a computation was aborted. + + Returns: + An Outcome indicating that a computation was aborted. + """ + return _ABORTED_OUTCOME + + +def raised(exception): + """Returns an Outcome indicating that a computation raised an exception. + + Args: + exception: The exception raised by the computation. + + Returns: + An Outcome indicating that a computation raised the given exception. + """ + return _EasyOutcome(RAISED, None, exception) + + +def returned(value): + """Returns an Outcome indicating that a computation returned a value. + + Args: + value: The value returned by the computation. + + Returns: + An Outcome indicating that a computation returned the given value. + """ + return _EasyOutcome(RETURNED, value, None) + + +class Future(object): + """A representation of a computation happening in another control flow. + + Computations represented by a Future may have already completed, may be + ongoing, or may be yet to be begun. + + Computations represented by a Future are considered uninterruptable; once + started they will be allowed to terminate either by returning or raising + an exception. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cancel(self): + """Attempts to cancel the computation. + + Returns: + True if the computation will not be allowed to take place or False if + the computation has already taken place or is currently taking place. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cancelled(self): + """Describes whether the computation was cancelled. + + Returns: + True if the computation was cancelled and did not take place or False + if the computation took place, is taking place, or is scheduled to + take place in the future. + """ + raise NotImplementedError() + + @abc.abstractmethod + def done(self): + """Describes whether the computation has taken place. + + Returns: + True if the computation took place; False otherwise. + """ + raise NotImplementedError() + + @abc.abstractmethod + def outcome(self): + """Accesses the outcome of the computation. + + If the computation has not yet completed, this method blocks until it has. + + Returns: + An Outcome describing the outcome of the computation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_done_callback(self, callback): + """Adds a function to be called at completion of the computation. + + The callback will be passed an Outcome object describing the outcome of + the computation. + + If the computation has already completed, the callback will be called + immediately. + + Args: + callback: A callable taking an Outcome as its single parameter. + """ + raise NotImplementedError() diff --git a/src/python/_framework/foundation/later.py b/src/python/_framework/foundation/later.py new file mode 100644 index 0000000000..fc2cf578d0 --- /dev/null +++ b/src/python/_framework/foundation/later.py @@ -0,0 +1,51 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Enables scheduling execution at a later time.""" + +import time + +from _framework.foundation import _timer_future + + +def later(delay, computation): + """Schedules later execution of a callable. + + Args: + delay: Any numeric value. Represents the minimum length of time in seconds + to allow to pass before beginning the computation. No guarantees are made + about the maximum length of time that will pass. + computation: A callable that accepts no arguments. + + Returns: + A Future representing the scheduled computation. + """ + timer_future = _timer_future.TimerFuture(time.time() + delay, computation) + timer_future.start() + return timer_future diff --git a/src/python/_framework/foundation/logging_pool.py b/src/python/_framework/foundation/logging_pool.py new file mode 100644 index 0000000000..7c7a6eebfc --- /dev/null +++ b/src/python/_framework/foundation/logging_pool.py @@ -0,0 +1,83 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A thread pool that logs exceptions raised by tasks executed within it.""" + +import functools +import logging + +from concurrent import futures + + +def _wrap(behavior): + """Wraps an arbitrary callable behavior in exception-logging.""" + @functools.wraps(behavior) + def _wrapping(*args, **kwargs): + try: + return behavior(*args, **kwargs) + except Exception as e: + logging.exception('Unexpected exception from task run in logging pool!') + raise + return _wrapping + + +class _LoggingPool(object): + """An exception-logging futures.ThreadPoolExecutor-compatible thread pool.""" + + def __init__(self, backing_pool): + self._backing_pool = backing_pool + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._backing_pool.shutdown(wait=True) + + def submit(self, fn, *args, **kwargs): + return self._backing_pool.submit(_wrap(fn), *args, **kwargs) + + def map(self, func, *iterables, **kwargs): + return self._backing_pool.map( + _wrap(func), *iterables, timeout=kwargs.get('timeout', None)) + + def shutdown(self, wait=True): + self._backing_pool.shutdown(wait=wait) + + +def pool(max_workers): + """Creates a thread pool that logs exceptions raised by the tasks within it. + + Args: + max_workers: The maximum number of worker threads to allow the pool. + + Returns: + A futures.ThreadPoolExecutor-compatible thread pool that logs exceptions + raised by the tasks executed within it. + """ + return _LoggingPool(futures.ThreadPoolExecutor(max_workers)) diff --git a/src/python/_framework/foundation/stream.py b/src/python/_framework/foundation/stream.py new file mode 100644 index 0000000000..75c0cf145b --- /dev/null +++ b/src/python/_framework/foundation/stream.py @@ -0,0 +1,60 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces related to streams of values or objects.""" + +import abc + + +class Consumer(object): + """Interface for consumers of finite streams of values or objects.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def consume(self, value): + """Accepts a value. + + Args: + value: Any value accepted by this Consumer. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates to this Consumer that no more values will be supplied.""" + raise NotImplementedError() + + @abc.abstractmethod + def consume_and_terminate(self, value): + """Supplies a value and signals that no more values will be supplied. + + Args: + value: Any value accepted by this Consumer. + """ + raise NotImplementedError() diff --git a/src/python/_framework/foundation/stream_testing.py b/src/python/_framework/foundation/stream_testing.py new file mode 100644 index 0000000000..c1acedc5c6 --- /dev/null +++ b/src/python/_framework/foundation/stream_testing.py @@ -0,0 +1,73 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for testing stream-related code.""" + +from _framework.foundation import stream + + +class TestConsumer(stream.Consumer): + """A stream.Consumer instrumented for testing. + + Attributes: + calls: A sequence of value-termination pairs describing the history of calls + made on this object. + """ + + def __init__(self): + self.calls = [] + + def consume(self, value): + """See stream.Consumer.consume for specification.""" + self.calls.append((value, False)) + + def terminate(self): + """See stream.Consumer.terminate for specification.""" + self.calls.append((None, True)) + + def consume_and_terminate(self, value): + """See stream.Consumer.consume_and_terminate for specification.""" + self.calls.append((value, True)) + + def is_legal(self): + """Reports whether or not a legal sequence of calls has been made.""" + terminated = False + for value, terminal in self.calls: + if terminated: + return False + elif terminal: + terminated = True + elif value is None: + return False + else: # pylint: disable=useless-else-on-loop + return True + + def values(self): + """Returns the sequence of values that have been passed to this Consumer.""" + return [value for value, _ in self.calls if value] diff --git a/src/python/_framework/foundation/stream_util.py b/src/python/_framework/foundation/stream_util.py new file mode 100644 index 0000000000..3a9c043316 --- /dev/null +++ b/src/python/_framework/foundation/stream_util.py @@ -0,0 +1,160 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Helpful utilities related to the stream module.""" + +import logging +import threading + +from _framework.foundation import stream + +_NO_VALUE = object() + + +class TransformingConsumer(stream.Consumer): + """A stream.Consumer that passes a transformation of its input to another.""" + + def __init__(self, transformation, downstream): + self._transformation = transformation + self._downstream = downstream + + def consume(self, value): + self._downstream.consume(self._transformation(value)) + + def terminate(self): + self._downstream.terminate() + + def consume_and_terminate(self, value): + self._downstream.consume_and_terminate(self._transformation(value)) + + +class IterableConsumer(stream.Consumer): + """A Consumer that when iterated over emits the values it has consumed.""" + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._active = True + + def consume(self, stock_reply): + with self._condition: + if self._active: + self._values.append(stock_reply) + self._condition.notify() + + def terminate(self): + with self._condition: + self._active = False + self._condition.notify() + + def consume_and_terminate(self, stock_reply): + with self._condition: + if self._active: + self._values.append(stock_reply) + self._active = False + self._condition.notify() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._active and not self._values: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + +class ThreadSwitchingConsumer(stream.Consumer): + """A Consumer decorator that affords serialization and asynchrony.""" + + def __init__(self, sink, pool): + self._lock = threading.Lock() + self._sink = sink + self._pool = pool + # True if self._spin has been submitted to the pool to be called once and + # that call has not yet returned, False otherwise. + self._spinning = False + self._values = [] + self._active = True + + def _spin(self, sink, value, terminate): + while True: + try: + if value is _NO_VALUE: + sink.terminate() + elif terminate: + sink.consume_and_terminate(value) + else: + sink.consume(value) + except Exception as e: # pylint:disable=broad-except + logging.exception(e) + + with self._lock: + if terminate: + self._spinning = False + return + elif self._values: + value = self._values.pop(0) + terminate = not self._values and not self._active + elif not self._active: + value = _NO_VALUE + terminate = True + else: + self._spinning = False + return + + def consume(self, value): + with self._lock: + if self._active: + if self._spinning: + self._values.append(value) + else: + self._pool.submit(self._spin, self._sink, value, False) + self._spinning = True + + def terminate(self): + with self._lock: + if self._active: + self._active = False + if not self._spinning: + self._pool.submit(self._spin, self._sink, _NO_VALUE, True) + self._spinning = True + + def consume_and_terminate(self, value): + with self._lock: + if self._active: + self._active = False + if self._spinning: + self._values.append(value) + else: + self._pool.submit(self._spin, self._sink, value, True) + self._spinning = True diff --git a/src/python/_junkdrawer/__init__.py b/src/python/_junkdrawer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/_junkdrawer/__init__.py diff --git a/src/python/_junkdrawer/stock_pb2.py b/src/python/_junkdrawer/stock_pb2.py new file mode 100644 index 0000000000..eef18f82d6 --- /dev/null +++ b/src/python/_junkdrawer/stock_pb2.py @@ -0,0 +1,152 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# TODO(nathaniel): Remove this from source control after having made +# generation from the stock.proto source part of GRPC's build-and-test +# process. + +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: stock.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='stock.proto', + package='stock', + serialized_pb=_b('\n\x0bstock.proto\x12\x05stock\">\n\x0cStockRequest\x12\x0e\n\x06symbol\x18\x01 \x01(\t\x12\x1e\n\x13num_trades_to_watch\x18\x02 \x01(\x05:\x01\x30\"+\n\nStockReply\x12\r\n\x05price\x18\x01 \x01(\x02\x12\x0e\n\x06symbol\x18\x02 \x01(\t2\x96\x02\n\x05Stock\x12=\n\x11GetLastTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x12I\n\x19GetLastTradePriceMultiple\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01\x30\x01\x12?\n\x11WatchFutureTrades\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x30\x01\x12\x42\n\x14GetHighestTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_STOCKREQUEST = _descriptor.Descriptor( + name='StockRequest', + full_name='stock.StockRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='symbol', full_name='stock.StockRequest.symbol', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='num_trades_to_watch', full_name='stock.StockRequest.num_trades_to_watch', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=True, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=22, + serialized_end=84, +) + + +_STOCKREPLY = _descriptor.Descriptor( + name='StockReply', + full_name='stock.StockReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='price', full_name='stock.StockReply.price', index=0, + number=1, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='symbol', full_name='stock.StockReply.symbol', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=86, + serialized_end=129, +) + +DESCRIPTOR.message_types_by_name['StockRequest'] = _STOCKREQUEST +DESCRIPTOR.message_types_by_name['StockReply'] = _STOCKREPLY + +StockRequest = _reflection.GeneratedProtocolMessageType('StockRequest', (_message.Message,), dict( + DESCRIPTOR = _STOCKREQUEST, + __module__ = 'stock_pb2' + # @@protoc_insertion_point(class_scope:stock.StockRequest) + )) +_sym_db.RegisterMessage(StockRequest) + +StockReply = _reflection.GeneratedProtocolMessageType('StockReply', (_message.Message,), dict( + DESCRIPTOR = _STOCKREPLY, + __module__ = 'stock_pb2' + # @@protoc_insertion_point(class_scope:stock.StockReply) + )) +_sym_db.RegisterMessage(StockReply) + + +# @@protoc_insertion_point(module_scope) |