diff options
Diffstat (limited to 'src/python/_framework/base/packets/_ends.py')
-rw-r--r-- | src/python/_framework/base/packets/_ends.py | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/src/python/_framework/base/packets/_ends.py b/src/python/_framework/base/packets/_ends.py new file mode 100644 index 0000000000..baaf5cacf9 --- /dev/null +++ b/src/python/_framework/base/packets/_ends.py @@ -0,0 +1,408 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementations of Fronts and Backs.""" + +import collections +import threading +import uuid + +# _interfaces and packets are referenced from specification in this module. +from _framework.base import interfaces as base_interfaces +from _framework.base.packets import _cancellation +from _framework.base.packets import _context +from _framework.base.packets import _emission +from _framework.base.packets import _expiration +from _framework.base.packets import _ingestion +from _framework.base.packets import _interfaces # pylint: disable=unused-import +from _framework.base.packets import _reception +from _framework.base.packets import _termination +from _framework.base.packets import _transmission +from _framework.base.packets import interfaces +from _framework.base.packets import packets # pylint: disable=unused-import +from _framework.foundation import callable_util + +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' + +_OPERATION_OUTCOMES = ( + base_interfaces.COMPLETED, + base_interfaces.CANCELLED, + base_interfaces.EXPIRED, + base_interfaces.RECEPTION_FAILURE, + base_interfaces.TRANSMISSION_FAILURE, + base_interfaces.SERVICER_FAILURE, + base_interfaces.SERVICED_FAILURE, + ) + + +class _EasyOperation(base_interfaces.Operation): + """A trivial implementation of base_interfaces.Operation.""" + + def __init__(self, emission_manager, context, cancellation_manager): + """Constructor. + + Args: + emission_manager: The _interfaces.EmissionManager for the operation that + will accept values emitted by customer code. + context: The base_interfaces.OperationContext for use by the customer + during the operation. + cancellation_manager: The _interfaces.CancellationManager for the + operation. + """ + self.consumer = emission_manager + self.context = context + self._cancellation_manager = cancellation_manager + + def cancel(self): + self._cancellation_manager.cancel() + + +class _Endlette(object): + """Utility for stateful behavior common to Fronts and Backs.""" + + def __init__(self, pool): + """Constructor. + + Args: + pool: A thread pool to use when calling registered idle actions. + """ + self._lock = threading.Lock() + self._pool = pool + # Dictionary from operation IDs to ReceptionManager-or-None. A None value + # indicates an in-progress fire-and-forget operation for which the customer + # has chosen to ignore results. + self._operations = {} + self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES} + self._idle_actions = [] + + def terminal_action(self, operation_id): + """Constructs the termination action for a single operation. + + Args: + operation_id: An operation ID. + + Returns: + A callable that takes an operation outcome for an argument to be used as + the termination action for the operation associated with the given + operation ID. + """ + def termination_action(outcome): + with self._lock: + self._stats[outcome] += 1 + self._operations.pop(operation_id, None) + if not self._operations: + for action in self._idle_actions: + self._pool.submit(callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) + self._idle_actions = [] + return termination_action + + def __enter__(self): + self._lock.acquire() + + def __exit__(self, exc_type, exc_val, exc_tb): + self._lock.release() + + def get_operation(self, operation_id): + return self._operations.get(operation_id, None) + + def add_operation(self, operation_id, operation_reception_manager): + self._operations[operation_id] = operation_reception_manager + + def operation_stats(self): + with self._lock: + return dict(self._stats) + + def add_idle_action(self, action): + with self._lock: + if self._operations: + self._idle_actions.append(action) + else: + self._pool.submit(callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)) + + +class _FrontManagement( + collections.namedtuple( + '_FrontManagement', + ('reception', 'emission', 'operation', 'cancellation'))): + """Just a trivial helper class to bundle four fellow-traveling objects.""" + + +def _front_operate( + callback, work_pool, transmission_pool, utility_pool, + termination_action, operation_id, name, payload, complete, timeout, + subscription, trace_id): + """Constructs objects necessary for front-side operation management. + + Args: + callback: A callable that accepts packets.FrontToBackPackets and delivers + them to the other side of the operation. Execution of this callable may + take any arbitrary length of time. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + termination_action: A no-arg behavior to be called upon operation + completion. + operation_id: An object identifying the operation. + name: The name of the method being called during the operation. + payload: The first customer-significant value to be transmitted to the other + side. May be None if there is no such value or if the customer chose not + to pass it at operation invocation. + complete: A boolean indicating whether or not additional payloads will be + supplied by the customer. + timeout: A length of time in seconds to allow for the operation. + subscription: A base_interfaces.ServicedSubscription describing the + customer's interest in the results of the operation. + trace_id: A uuid.UUID identifying a set of related operations to which this + operation belongs. May be None. + + Returns: + A _FrontManagement object bundling together the + _interfaces.ReceptionManager, _interfaces.EmissionManager, + _context.OperationContext, and _interfaces.CancellationManager for the + operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.front_termination_manager( + work_pool, utility_pool, termination_action, subscription.category) + transmission_manager = _transmission.front_transmission_manager( + lock, transmission_pool, callback, operation_id, name, + subscription.category, trace_id, timeout, termination_manager) + operation_context = _context.OperationContext( + lock, operation_id, packets.Kind.SERVICED_FAILURE, + termination_manager, transmission_manager) + emission_manager = _emission.front_emission_manager( + lock, termination_manager, transmission_manager) + ingestion_manager = _ingestion.front_ingestion_manager( + lock, work_pool, subscription, termination_manager, + transmission_manager, operation_context) + expiration_manager = _expiration.front_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + timeout) + reception_manager = _reception.front_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + cancellation_manager = _cancellation.CancellationManager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + + transmission_manager.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + operation_context.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + emission_manager.set_ingestion_manager_and_expiration_manager( + ingestion_manager, expiration_manager) + ingestion_manager.set_expiration_manager(expiration_manager) + + transmission_manager.inmit(payload, complete) + + returned_reception_manager = ( + None if subscription.category == base_interfaces.NONE + else reception_manager) + + return _FrontManagement( + returned_reception_manager, emission_manager, operation_context, + cancellation_manager) + + +class Front(interfaces.Front): + """An implementation of interfaces.Front.""" + + def __init__(self, work_pool, transmission_pool, utility_pool): + """Constructor. + + Args: + work_pool: A thread pool to be used for executing customer code. + transmission_pool: A thread pool to be used for transmitting values to + the other side of the operation. + utility_pool: A thread pool to be used for utility tasks. + """ + self._endlette = _Endlette(utility_pool) + self._work_pool = work_pool + self._transmission_pool = transmission_pool + self._utility_pool = utility_pool + self._callback = None + + self._operations = {} + + def join_rear_link(self, rear_link): + """See interfaces.ForeLink.join_rear_link for specification.""" + with self._endlette: + self._callback = rear_link.accept_front_to_back_ticket + + def operation_stats(self): + """See base_interfaces.End.operation_stats for specification.""" + return self._endlette.operation_stats() + + def add_idle_action(self, action): + """See base_interfaces.End.add_idle_action for specification.""" + self._endlette.add_idle_action(action) + + def operate( + self, name, payload, complete, timeout, subscription, trace_id): + """See base_interfaces.Front.operate for specification.""" + operation_id = uuid.uuid4() + with self._endlette: + management = _front_operate( + self._callback, self._work_pool, self._transmission_pool, + self._utility_pool, self._endlette.terminal_action(operation_id), + operation_id, name, payload, complete, timeout, subscription, + trace_id) + self._endlette.add_operation(operation_id, management.reception) + return _EasyOperation( + management.emission, management.operation, management.cancellation) + + def accept_back_to_front_ticket(self, ticket): + """See interfaces.End.act for specification.""" + with self._endlette: + reception_manager = self._endlette.get_operation(ticket.operation_id) + if reception_manager: + reception_manager.receive_packet(ticket) + + +def _back_operate( + servicer, callback, work_pool, transmission_pool, utility_pool, + termination_action, ticket, default_timeout, maximum_timeout): + """Constructs objects necessary for back-side operation management. + + Also begins back-side operation by feeding the first received ticket into the + constructed _interfaces.ReceptionManager. + + Args: + servicer: An interfaces.Servicer for servicing operations. + callback: A callable that accepts packets.BackToFrontPackets and delivers + them to the other side of the operation. Execution of this callable may + take any arbitrary length of time. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + termination_action: A no-arg behavior to be called upon operation + completion. + ticket: The first packets.FrontToBackPacket received for the operation. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + The _interfaces.ReceptionManager to be used for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.back_termination_manager( + work_pool, utility_pool, termination_action, ticket.subscription) + transmission_manager = _transmission.back_transmission_manager( + lock, transmission_pool, callback, ticket.operation_id, + termination_manager, ticket.subscription) + operation_context = _context.OperationContext( + lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE, + termination_manager, transmission_manager) + emission_manager = _emission.back_emission_manager( + lock, termination_manager, transmission_manager) + ingestion_manager = _ingestion.back_ingestion_manager( + lock, work_pool, servicer, termination_manager, + transmission_manager, operation_context, emission_manager) + expiration_manager = _expiration.back_expiration_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + ticket.timeout, default_timeout, maximum_timeout) + reception_manager = _reception.back_reception_manager( + lock, termination_manager, transmission_manager, ingestion_manager, + expiration_manager) + + transmission_manager.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + operation_context.set_ingestion_and_expiration_managers( + ingestion_manager, expiration_manager) + emission_manager.set_ingestion_manager_and_expiration_manager( + ingestion_manager, expiration_manager) + ingestion_manager.set_expiration_manager(expiration_manager) + + reception_manager.receive_packet(ticket) + + return reception_manager + + +class Back(interfaces.Back): + """An implementation of interfaces.Back.""" + + def __init__( + self, servicer, work_pool, transmission_pool, utility_pool, + default_timeout, maximum_timeout): + """Constructor. + + Args: + servicer: An interfaces.Servicer for servicing operations. + work_pool: A thread pool in which to execute customer code. + transmission_pool: A thread pool to use for transmitting to the other side + of the operation. + utility_pool: A thread pool for utility tasks. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + """ + self._endlette = _Endlette(utility_pool) + self._servicer = servicer + self._work_pool = work_pool + self._transmission_pool = transmission_pool + self._utility_pool = utility_pool + self._default_timeout = default_timeout + self._maximum_timeout = maximum_timeout + self._callback = None + + def join_fore_link(self, fore_link): + """See interfaces.RearLink.join_fore_link for specification.""" + with self._endlette: + self._callback = fore_link.accept_back_to_front_ticket + + def accept_front_to_back_ticket(self, ticket): + """See interfaces.RearLink.accept_front_to_back_ticket for specification.""" + with self._endlette: + reception_manager = self._endlette.get_operation(ticket.operation_id) + if reception_manager is None: + reception_manager = _back_operate( + self._servicer, self._callback, self._work_pool, + self._transmission_pool, self._utility_pool, + self._endlette.terminal_action(ticket.operation_id), ticket, + self._default_timeout, self._maximum_timeout) + self._endlette.add_operation(ticket.operation_id, reception_manager) + else: + reception_manager.receive_packet(ticket) + + def operation_stats(self): + """See base_interfaces.End.operation_stats for specification.""" + return self._endlette.operation_stats() + + def add_idle_action(self, action): + """See base_interfaces.End.add_idle_action for specification.""" + self._endlette.add_idle_action(action) |