diff options
Diffstat (limited to 'src/python/grpcio/grpc/framework/base/_ingestion.py')
-rw-r--r-- | src/python/grpcio/grpc/framework/base/_ingestion.py | 443 |
1 files changed, 0 insertions, 443 deletions
diff --git a/src/python/grpcio/grpc/framework/base/_ingestion.py b/src/python/grpcio/grpc/framework/base/_ingestion.py deleted file mode 100644 index c9b10acb77..0000000000 --- a/src/python/grpcio/grpc/framework/base/_ingestion.py +++ /dev/null @@ -1,443 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""State and behavior for ingestion during an operation.""" - -import abc -import collections - -import six - -from grpc.framework.base import _constants -from grpc.framework.base import _interfaces -from grpc.framework.base import exceptions -from grpc.framework.base import interfaces -from grpc.framework.foundation import abandonment -from grpc.framework.foundation import callable_util -from grpc.framework.foundation import stream - -_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' -_CONSUME_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' - - -class _ConsumerCreation(collections.namedtuple( - '_ConsumerCreation', ('consumer', 'remote_error', 'abandoned'))): - """A sum type for the outcome of ingestion initialization. - - Either consumer will be non-None, remote_error will be True, or abandoned will - be True. - - Attributes: - consumer: A stream.Consumer for ingesting payloads. - remote_error: A boolean indicating that the consumer could not be created - due to an error on the remote side of the operation. - abandoned: A boolean indicating that the consumer creation was abandoned. - """ - - -class _EmptyConsumer(stream.Consumer): - """A no-operative stream.Consumer that ignores all inputs and calls.""" - - def consume(self, value): - """See stream.Consumer.consume for specification.""" - - def terminate(self): - """See stream.Consumer.terminate for specification.""" - - def consume_and_terminate(self, value): - """See stream.Consumer.consume_and_terminate for specification.""" - - -class _ConsumerCreator(six.with_metaclass(abc.ABCMeta)): - """Common specification of different consumer-creating behavior.""" - - @abc.abstractmethod - def create_consumer(self, requirement): - """Creates the stream.Consumer to which customer payloads will be delivered. - - Any exceptions raised by this method should be attributed to and treated as - defects in the serviced or servicer code called by this method. - - Args: - requirement: A value required by this _ConsumerCreator for consumer - creation. - - Returns: - A _ConsumerCreation describing the result of consumer creation. - """ - raise NotImplementedError() - - -class _FrontConsumerCreator(_ConsumerCreator): - """A _ConsumerCreator appropriate for front-side use.""" - - def __init__(self, subscription, operation_context): - """Constructor. - - Args: - subscription: The serviced's interfaces.ServicedSubscription for the - operation. - operation_context: The interfaces.OperationContext object for the - operation. - """ - self._subscription = subscription - self._operation_context = operation_context - - def create_consumer(self, requirement): - """See _ConsumerCreator.create_consumer for specification.""" - if self._subscription.kind is interfaces.ServicedSubscription.Kind.FULL: - try: - return _ConsumerCreation( - self._subscription.ingestor.consumer(self._operation_context), - False, False) - except abandonment.Abandoned: - return _ConsumerCreation(None, False, True) - else: - return _ConsumerCreation(_EmptyConsumer(), False, False) - - -class _BackConsumerCreator(_ConsumerCreator): - """A _ConsumerCreator appropriate for back-side use.""" - - def __init__(self, servicer, operation_context, emission_consumer): - """Constructor. - - Args: - servicer: The interfaces.Servicer that will service the operation. - operation_context: The interfaces.OperationContext object for the - operation. - emission_consumer: The stream.Consumer object to which payloads emitted - from the operation will be passed. - """ - self._servicer = servicer - self._operation_context = operation_context - self._emission_consumer = emission_consumer - - def create_consumer(self, requirement): - """See _ConsumerCreator.create_consumer for full specification. - - Args: - requirement: The name of the Servicer method to be called during this - operation. - - Returns: - A _ConsumerCreation describing the result of consumer creation. - """ - try: - return _ConsumerCreation( - self._servicer.service( - requirement, self._operation_context, self._emission_consumer), - False, False) - except exceptions.NoSuchMethodError: - return _ConsumerCreation(None, True, False) - except abandonment.Abandoned: - return _ConsumerCreation(None, False, True) - - -class _WrappedConsumer(object): - """Wraps a consumer to catch the exceptions that it is allowed to throw.""" - - def __init__(self, consumer): - """Constructor. - - Args: - consumer: A stream.Consumer that may raise abandonment.Abandoned from any - of its methods. - """ - self._consumer = consumer - - def moar(self, payload, complete): - """Makes progress with the wrapped consumer. - - This method catches all exceptions allowed to be thrown by the wrapped - consumer. Any exceptions raised by this method should be blamed on the - customer-supplied consumer. - - Args: - payload: A customer-significant payload object. May be None only if - complete is True. - complete: Whether or not the end of the payload sequence has been reached. - Must be True if payload is None. - - Returns: - True if the wrapped consumer made progress or False if the wrapped - consumer raised abandonment.Abandoned to indicate its abandonment of - progress. - """ - try: - if payload is None: - self._consumer.terminate() - elif complete: - self._consumer.consume_and_terminate(payload) - else: - self._consumer.consume(payload) - return True - except abandonment.Abandoned: - return False - - -class _IngestionManager(_interfaces.IngestionManager): - """An implementation of _interfaces.IngestionManager.""" - - def __init__( - self, lock, pool, consumer_creator, failure_outcome, termination_manager, - transmission_manager): - """Constructor. - - Args: - lock: The operation-wide lock. - pool: A thread pool in which to execute customer code. - consumer_creator: A _ConsumerCreator wrapping the portion of customer code - that when called returns the stream.Consumer with which the customer - code will ingest payload values. - failure_outcome: Whichever one of - interfaces.Outcome.SERVICED_FAILURE or - interfaces.Outcome.SERVICER_FAILURE describes local failure of - customer code. - termination_manager: The _interfaces.TerminationManager for the operation. - transmission_manager: The _interfaces.TransmissionManager for the - operation. - """ - self._lock = lock - self._pool = pool - self._consumer_creator = consumer_creator - self._failure_outcome = failure_outcome - self._termination_manager = termination_manager - self._transmission_manager = transmission_manager - self._expiration_manager = None - - self._wrapped_ingestion_consumer = None - self._pending_ingestion = [] - self._ingestion_complete = False - self._processing = False - - def set_expiration_manager(self, expiration_manager): - self._expiration_manager = expiration_manager - - def _abort_internal_only(self): - self._wrapped_ingestion_consumer = None - self._pending_ingestion = None - - def _abort_and_notify(self, outcome): - self._abort_internal_only() - self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome) - self._expiration_manager.abort() - - def _next(self): - """Computes the next step for ingestion. - - Returns: - A payload, complete, continue triplet indicating what payload (if any) is - available to feed into customer code, whether or not the sequence of - payloads has terminated, and whether or not there is anything - immediately actionable to call customer code to do. - """ - if self._pending_ingestion is None: - return None, False, False - elif self._pending_ingestion: - payload = self._pending_ingestion.pop(0) - complete = self._ingestion_complete and not self._pending_ingestion - return payload, complete, True - elif self._ingestion_complete: - return None, True, True - else: - return None, False, False - - def _process(self, wrapped_ingestion_consumer, payload, complete): - """A method to call to execute customer code. - - This object's lock must *not* be held when calling this method. - - Args: - wrapped_ingestion_consumer: The _WrappedConsumer with which to pass - payloads to customer code. - payload: A customer payload. May be None only if complete is True. - complete: Whether or not the sequence of payloads to pass to the customer - has concluded. - """ - while True: - consumption_outcome = callable_util.call_logging_exceptions( - wrapped_ingestion_consumer.moar, _CONSUME_EXCEPTION_LOG_MESSAGE, - payload, complete) - if consumption_outcome.exception is None: - if consumption_outcome.return_value: - with self._lock: - if complete: - self._pending_ingestion = None - self._termination_manager.ingestion_complete() - return - else: - payload, complete, moar = self._next() - if not moar: - self._processing = False - return - else: - with self._lock: - if self._pending_ingestion is not None: - self._abort_and_notify(self._failure_outcome) - self._processing = False - return - else: - with self._lock: - self._abort_and_notify(self._failure_outcome) - self._processing = False - return - - def start(self, requirement): - if self._pending_ingestion is not None: - def initialize(): - consumer_creation_outcome = callable_util.call_logging_exceptions( - self._consumer_creator.create_consumer, - _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement) - if consumer_creation_outcome.return_value is None: - with self._lock: - self._abort_and_notify(self._failure_outcome) - self._processing = False - elif consumer_creation_outcome.return_value.remote_error: - with self._lock: - self._abort_and_notify(interfaces.Outcome.RECEPTION_FAILURE) - self._processing = False - elif consumer_creation_outcome.return_value.abandoned: - with self._lock: - if self._pending_ingestion is not None: - self._abort_and_notify(self._failure_outcome) - self._processing = False - else: - wrapped_ingestion_consumer = _WrappedConsumer( - consumer_creation_outcome.return_value.consumer) - with self._lock: - self._wrapped_ingestion_consumer = wrapped_ingestion_consumer - payload, complete, moar = self._next() - if not moar: - self._processing = False - return - - self._process(wrapped_ingestion_consumer, payload, complete) - - self._pool.submit( - callable_util.with_exceptions_logged( - initialize, _constants.INTERNAL_ERROR_LOG_MESSAGE)) - self._processing = True - - def consume(self, payload): - if self._ingestion_complete: - self._abort_and_notify(self._failure_outcome) - elif self._pending_ingestion is not None: - if self._processing: - self._pending_ingestion.append(payload) - else: - self._pool.submit( - callable_util.with_exceptions_logged( - self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), - self._wrapped_ingestion_consumer, payload, False) - self._processing = True - - def terminate(self): - if self._ingestion_complete: - self._abort_and_notify(self._failure_outcome) - else: - self._ingestion_complete = True - if self._pending_ingestion is not None and not self._processing: - self._pool.submit( - callable_util.with_exceptions_logged( - self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), - self._wrapped_ingestion_consumer, None, True) - self._processing = True - - def consume_and_terminate(self, payload): - if self._ingestion_complete: - self._abort_and_notify(self._failure_outcome) - else: - self._ingestion_complete = True - if self._pending_ingestion is not None: - if self._processing: - self._pending_ingestion.append(payload) - else: - self._pool.submit( - callable_util.with_exceptions_logged( - self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE), - self._wrapped_ingestion_consumer, payload, True) - self._processing = True - - def abort(self): - """See _interfaces.IngestionManager.abort for specification.""" - self._abort_internal_only() - - -def front_ingestion_manager( - lock, pool, subscription, termination_manager, transmission_manager, - operation_context): - """Creates an IngestionManager appropriate for front-side use. - - Args: - lock: The operation-wide lock. - pool: A thread pool in which to execute customer code. - subscription: A interfaces.ServicedSubscription indicating the - customer's interest in the results of the operation. - termination_manager: The _interfaces.TerminationManager for the operation. - transmission_manager: The _interfaces.TransmissionManager for the - operation. - operation_context: A interfaces.OperationContext for the operation. - - Returns: - An IngestionManager appropriate for front-side use. - """ - ingestion_manager = _IngestionManager( - lock, pool, _FrontConsumerCreator(subscription, operation_context), - interfaces.Outcome.SERVICED_FAILURE, termination_manager, - transmission_manager) - ingestion_manager.start(None) - return ingestion_manager - - -def back_ingestion_manager( - lock, pool, servicer, termination_manager, transmission_manager, - operation_context, emission_consumer): - """Creates an IngestionManager appropriate for back-side use. - - Args: - lock: The operation-wide lock. - pool: A thread pool in which to execute customer code. - servicer: A interfaces.Servicer for servicing the operation. - termination_manager: The _interfaces.TerminationManager for the operation. - transmission_manager: The _interfaces.TransmissionManager for the - operation. - operation_context: A interfaces.OperationContext for the operation. - emission_consumer: The _interfaces.EmissionConsumer for the operation. - - Returns: - An IngestionManager appropriate for back-side use. - """ - ingestion_manager = _IngestionManager( - lock, pool, _BackConsumerCreator( - servicer, operation_context, emission_consumer), - interfaces.Outcome.SERVICER_FAILURE, termination_manager, - transmission_manager) - return ingestion_manager |