# Copyright 2015, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """State and behavior for operation termination.""" import abc import six from grpc.framework.core import _constants from grpc.framework.core import _interfaces from grpc.framework.core import _utilities from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base def _invocation_completion_predicate( unused_emission_complete, unused_transmission_complete, unused_reception_complete, ingestion_complete): return ingestion_complete def _service_completion_predicate( unused_emission_complete, transmission_complete, unused_reception_complete, ingestion_complete): return transmission_complete and ingestion_complete class TerminationManager(six.with_metaclass(abc.ABCMeta, _interfaces.TerminationManager)): """A _interfaces.TransmissionManager on which another manager may be set.""" @abc.abstractmethod def set_expiration_manager(self, expiration_manager): """Sets the expiration manager with which this manager will interact. Args: expiration_manager: The _interfaces.ExpirationManager associated with the current operation. """ raise NotImplementedError() class _TerminationManager(TerminationManager): """An implementation of TerminationManager.""" def __init__(self, predicate, action, pool): """Constructor. Args: predicate: One of _invocation_completion_predicate or _service_completion_predicate to be used to determine when the operation has completed. action: A behavior to pass the operation outcome's kind on operation termination. pool: A thread pool. """ self._predicate = predicate self._action = action self._pool = pool self._expiration_manager = None self._callbacks = [] self._code = None self._details = None self._emission_complete = False self._transmission_complete = False self._reception_complete = False self._ingestion_complete = False # The None-ness of outcome is the operation-wide record of whether and how # the operation has terminated. self.outcome = None def set_expiration_manager(self, expiration_manager): self._expiration_manager = expiration_manager def _terminate_internal_only(self, outcome): """Terminates the operation. Args: outcome: A base.Outcome describing the outcome of the operation. """ self.outcome = outcome callbacks = list(self._callbacks) self._callbacks = None act = callable_util.with_exceptions_logged( self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) # TODO(issue 3202): Don't call the local application's callbacks if it has # previously shown a programming defect. if False and outcome.kind is base.Outcome.Kind.LOCAL_FAILURE: self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE) else: def call_callbacks_and_act(callbacks, outcome): for callback in callbacks: callback_outcome = callable_util.call_logging_exceptions( callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) if callback_outcome.exception is not None: act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE break else: act_outcome_kind = outcome.kind act(act_outcome_kind) self._pool.submit( callable_util.with_exceptions_logged( call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE), callbacks, outcome) def _terminate_and_notify(self, outcome): self._terminate_internal_only(outcome) self._expiration_manager.terminate() def _perhaps_complete(self): if self._predicate( self._emission_complete, self._transmission_complete, self._reception_complete, self._ingestion_complete): self._terminate_and_notify( _utilities.Outcome( base.Outcome.Kind.COMPLETED, self._code, self._details)) return True else: return False def is_active(self): """See _interfaces.TerminationManager.is_active for specification.""" return self.outcome is None def add_callback(self, callback): """See _interfaces.TerminationManager.add_callback for specification.""" if self.outcome is None: self._callbacks.append(callback) return None else: return self.outcome def emission_complete(self): """See superclass method for specification.""" if self.outcome is None: self._emission_complete = True self._perhaps_complete() def transmission_complete(self): """See superclass method for specification.""" if self.outcome is None: self._transmission_complete = True return self._perhaps_complete() else: return False def reception_complete(self, code, details): """See superclass method for specification.""" if self.outcome is None: self._reception_complete = True self._code = code self._details = details self._perhaps_complete() def ingestion_complete(self): """See superclass method for specification.""" if self.outcome is None: self._ingestion_complete = True self._perhaps_complete() def expire(self): """See _interfaces.TerminationManager.expire for specification.""" self._terminate_internal_only( _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None)) def abort(self, outcome): """See _interfaces.TerminationManager.abort for specification.""" self._terminate_and_notify(outcome) def invocation_termination_manager(action, pool): """Creates a TerminationManager appropriate for invocation-side use. Args: action: An action to call on operation termination. pool: A thread pool in which to execute the passed action and any termination callbacks that are registered during the operation. Returns: A TerminationManager appropriate for invocation-side use. """ return _TerminationManager(_invocation_completion_predicate, action, pool) def service_termination_manager(action, pool): """Creates a TerminationManager appropriate for service-side use. Args: action: An action to call on operation termination. pool: A thread pool in which to execute the passed action and any termination callbacks that are registered during the operation. Returns: A TerminationManager appropriate for service-side use. """ return _TerminationManager(_service_completion_predicate, action, pool)