diff options
Diffstat (limited to 'src/python/_framework/base/packets/_ingestion.py')
-rw-r--r-- | src/python/_framework/base/packets/_ingestion.py | 440 |
1 files changed, 440 insertions, 0 deletions
diff --git a/src/python/_framework/base/packets/_ingestion.py b/src/python/_framework/base/packets/_ingestion.py new file mode 100644 index 0000000000..ad5ed4cada --- /dev/null +++ b/src/python/_framework/base/packets/_ingestion.py @@ -0,0 +1,440 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ingestion during an operation.""" + +import abc +import collections + +from _framework.base import exceptions +from _framework.base import interfaces +from _framework.base.packets import _constants +from _framework.base.packets import _interfaces +from _framework.base.packets import packets +from _framework.foundation import abandonment +from _framework.foundation import callable_util +from _framework.foundation import stream + +_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' +_CONSUME_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' + + +class _ConsumerCreation(collections.namedtuple( + '_ConsumerCreation', ('consumer', 'remote_error', 'abandoned'))): + """A sum type for the outcome of ingestion initialization. + + Either consumer will be non-None, remote_error will be True, or abandoned will + be True. + + Attributes: + consumer: A stream.Consumer for ingesting payloads. + remote_error: A boolean indicating that the consumer could not be created + due to an error on the remote side of the operation. + abandoned: A boolean indicating that the consumer creation was abandoned. + """ + + +class _EmptyConsumer(stream.Consumer): + """A no-operative stream.Consumer that ignores all inputs and calls.""" + + def consume(self, value): + """See stream.Consumer.consume for specification.""" + + def terminate(self): + """See stream.Consumer.terminate for specification.""" + + def consume_and_terminate(self, value): + """See stream.Consumer.consume_and_terminate for specification.""" + + +class _ConsumerCreator(object): + """Common specification of different consumer-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create_consumer(self, requirement): + """Creates the stream.Consumer to which customer payloads will be delivered. + + Any exceptions raised by this method should be attributed to and treated as + defects in the serviced or servicer code called by this method. + + Args: + requirement: A value required by this _ConsumerCreator for consumer + creation. + + Returns: + A _ConsumerCreation describing the result of consumer creation. + """ + raise NotImplementedError() + + +class _FrontConsumerCreator(_ConsumerCreator): + """A _ConsumerCreator appropriate for front-side use.""" + + def __init__(self, subscription, operation_context): + """Constructor. + + Args: + subscription: The serviced's interfaces.ServicedSubscription for the + operation. + operation_context: The interfaces.OperationContext object for the + operation. + """ + self._subscription = subscription + self._operation_context = operation_context + + def create_consumer(self, requirement): + """See _ConsumerCreator.create_consumer for specification.""" + if self._subscription.category == interfaces.FULL: + try: + return _ConsumerCreation( + self._subscription.ingestor.consumer(self._operation_context), + False, False) + except abandonment.Abandoned: + return _ConsumerCreation(None, False, True) + else: + return _ConsumerCreation(_EmptyConsumer(), False, False) + + +class _BackConsumerCreator(_ConsumerCreator): + """A _ConsumerCreator appropriate for back-side use.""" + + def __init__(self, servicer, operation_context, emission_consumer): + """Constructor. + + Args: + servicer: The interfaces.Servicer that will service the operation. + operation_context: The interfaces.OperationContext object for the + operation. + emission_consumer: The stream.Consumer object to which payloads emitted + from the operation will be passed. + """ + self._servicer = servicer + self._operation_context = operation_context + self._emission_consumer = emission_consumer + + def create_consumer(self, requirement): + """See _ConsumerCreator.create_consumer for full specification. + + Args: + requirement: The name of the Servicer method to be called during this + operation. + + Returns: + A _ConsumerCreation describing the result of consumer creation. + """ + try: + return _ConsumerCreation( + self._servicer.service( + requirement, self._operation_context, self._emission_consumer), + False, False) + except exceptions.NoSuchMethodError: + return _ConsumerCreation(None, True, False) + except abandonment.Abandoned: + return _ConsumerCreation(None, False, True) + + +class _WrappedConsumer(object): + """Wraps a consumer to catch the exceptions that it is allowed to throw.""" + + def __init__(self, consumer): + """Constructor. + + Args: + consumer: A stream.Consumer that may raise abandonment.Abandoned from any + of its methods. + """ + self._consumer = consumer + + def moar(self, payload, complete): + """Makes progress with the wrapped consumer. + + This method catches all exceptions allowed to be thrown by the wrapped + consumer. Any exceptions raised by this method should be blamed on the + customer-supplied consumer. + + Args: + payload: A customer-significant payload object. May be None only if + complete is True. + complete: Whether or not the end of the payload sequence has been reached. + May be False only if payload is not None. + + Returns: + True if the wrapped consumer made progress or False if the wrapped + consumer raised abandonment.Abandoned to indicate its abandonment of + progress. + """ + try: + if payload: + if complete: + self._consumer.consume_and_terminate(payload) + else: + self._consumer.consume(payload) + else: + self._consumer.terminate() + return True + except abandonment.Abandoned: + return False + + +class _IngestionManager(_interfaces.IngestionManager): + """An implementation of _interfaces.IngestionManager.""" + + def __init__( + self, lock, pool, consumer_creator, failure_kind, termination_manager, + transmission_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + consumer_creator: A _ConsumerCreator wrapping the portion of customer code + that when called returns the stream.Consumer with which the customer + code will ingest payload values. + failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or + packets.Kind.SERVICER_FAILURE describes local failure of customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._pool = pool + self._consumer_creator = consumer_creator + self._failure_kind = failure_kind + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = None + + self._wrapped_ingestion_consumer = None + self._pending_ingestion = [] + self._ingestion_complete = False + self._processing = False + + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + + def _abort_internal_only(self): + self._wrapped_ingestion_consumer = None + self._pending_ingestion = None + + def _abort_and_notify(self, outcome): + self._abort_internal_only() + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.abort() + + def _next(self): + """Computes the next step for ingestion. + + Returns: + A payload, complete, continue triplet indicating what payload (if any) is + available to feed into customer code, whether or not the sequence of + payloads has terminated, and whether or not there is anything + immediately actionable to call customer code to do. + """ + if self._pending_ingestion is None: + return None, False, False + elif self._pending_ingestion: + payload = self._pending_ingestion.pop(0) + complete = self._ingestion_complete and not self._pending_ingestion + return payload, complete, True + elif self._ingestion_complete: + return None, True, True + else: + return None, False, False + + def _process(self, wrapped_ingestion_consumer, payload, complete): + """A method to call to execute customer code. + + This object's lock must *not* be held when calling this method. + + Args: + wrapped_ingestion_consumer: The _WrappedConsumer with which to pass + payloads to customer code. + payload: A customer payload. May be None only if complete is True. + complete: Whether or not the sequence of payloads to pass to the customer + has concluded. + """ + while True: + consumption_outcome = callable_util.call_logging_exceptions( + wrapped_ingestion_consumer.moar, _CONSUME_EXCEPTION_LOG_MESSAGE, + payload, complete) + if consumption_outcome.exception is None: + if consumption_outcome.return_value: + with self._lock: + if complete: + self._pending_ingestion = None + self._termination_manager.ingestion_complete() + return + else: + payload, complete, moar = self._next() + if not moar: + self._processing = False + return + else: + with self._lock: + if self._pending_ingestion is not None: + self._abort_and_notify(self._failure_kind) + self._processing = False + return + else: + with self._lock: + self._abort_and_notify(self._failure_kind) + self._processing = False + return + + def start(self, requirement): + if self._pending_ingestion is not None: + def initialize(): + consumer_creation_outcome = callable_util.call_logging_exceptions( + self._consumer_creator.create_consumer, + _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement) + if consumer_creation_outcome.return_value is None: + with self._lock: + self._abort_and_notify(self._failure_kind) + self._processing = False + elif consumer_creation_outcome.return_value.remote_error: + with self._lock: + self._abort_and_notify(packets.Kind.RECEPTION_FAILURE) + self._processing = False + elif consumer_creation_outcome.return_value.abandoned: + with self._lock: + if self._pending_ingestion is not None: + self._abort_and_notify(self._failure_kind) + self._processing = False + else: + wrapped_ingestion_consumer = _WrappedConsumer( + consumer_creation_outcome.return_value.consumer) + with self._lock: + self._wrapped_ingestion_consumer = wrapped_ingestion_consumer + payload, complete, moar = self._next() + if not moar: + self._processing = False + return + + self._process(wrapped_ingestion_consumer, payload, complete) + + self._pool.submit( + callable_util.with_exceptions_logged( + initialize, _constants.INTERNAL_ERROR_LOG_MESSAGE)) + self._processing = True + + def consume(self, payload): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + elif self._pending_ingestion is not None: + if self._processing: + self._pending_ingestion.append(payload) + else: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, payload, False) + self._processing = True + + def terminate(self): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + else: + self._ingestion_complete = True + if self._pending_ingestion is not None and not self._processing: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, None, True) + self._processing = True + + def consume_and_terminate(self, payload): + if self._ingestion_complete: + self._abort_and_notify(self._failure_kind) + else: + self._ingestion_complete = True + if self._pending_ingestion is not None: + if self._processing: + self._pending_ingestion.append(payload) + else: + self._pool.submit( + callable_util.with_exceptions_logged( + self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_ingestion_consumer, payload, True) + self._processing = True + + def abort(self): + """See _interfaces.IngestionManager.abort for specification.""" + self._abort_internal_only() + + +def front_ingestion_manager( + lock, pool, subscription, termination_manager, transmission_manager, + operation_context): + """Creates an IngestionManager appropriate for front-side use. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + subscription: A base_interfaces.ServicedSubscription indicating the + customer's interest in the results of the operation. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + operation_context: A base_interfaces.OperationContext for the operation. + + Returns: + An IngestionManager appropriate for front-side use. + """ + ingestion_manager = _IngestionManager( + lock, pool, _FrontConsumerCreator(subscription, operation_context), + packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager) + ingestion_manager.start(None) + return ingestion_manager + + +def back_ingestion_manager( + lock, pool, servicer, termination_manager, transmission_manager, + operation_context, emission_consumer): + """Creates an IngestionManager appropriate for back-side use. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + servicer: A base_interfaces.Servicer for servicing the operation. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + operation_context: A base_interfaces.OperationContext for the operation. + emission_consumer: The _interfaces.EmissionConsumer for the operation. + + Returns: + An IngestionManager appropriate for back-side use. + """ + ingestion_manager = _IngestionManager( + lock, pool, _BackConsumerCreator( + servicer, operation_context, emission_consumer), + packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager) + return ingestion_manager |