# Copyright 2015, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """State and behavior for ticket transmission during an operation.""" import abc import six from grpc.framework.base import _constants from grpc.framework.base import _interfaces from grpc.framework.base import interfaces from grpc.framework.foundation import callable_util _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = ( interfaces.Outcome.SERVICER_FAILURE, ) _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = ( interfaces.Outcome.CANCELLED, interfaces.Outcome.SERVICED_FAILURE, ) _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = { interfaces.Outcome.CANCELLED: interfaces.FrontToBackTicket.Kind.CANCELLATION, interfaces.Outcome.EXPIRED: interfaces.FrontToBackTicket.Kind.EXPIRATION, interfaces.Outcome.RECEPTION_FAILURE: interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE, interfaces.Outcome.TRANSMISSION_FAILURE: interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, interfaces.Outcome.SERVICED_FAILURE: interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE, interfaces.Outcome.SERVICER_FAILURE: interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE, } _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = { interfaces.Outcome.CANCELLED: interfaces.BackToFrontTicket.Kind.CANCELLATION, interfaces.Outcome.EXPIRED: interfaces.BackToFrontTicket.Kind.EXPIRATION, interfaces.Outcome.RECEPTION_FAILURE: interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE, interfaces.Outcome.TRANSMISSION_FAILURE: interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, interfaces.Outcome.SERVICED_FAILURE: interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE, interfaces.Outcome.SERVICER_FAILURE: interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE, } class _Ticketizer(six.with_metaclass(abc.ABCMeta)): """Common specification of different ticket-creating behavior.""" @abc.abstractmethod def ticketize(self, operation_id, sequence_number, payload, complete): """Creates a ticket indicating ordinary operation progress. Args: operation_id: The operation ID for the current operation. sequence_number: A sequence number for the ticket. payload: A customer payload object. May be None if sequence_number is zero or complete is true. complete: A boolean indicating whether or not the ticket should describe itself as (but for a later indication of operation abortion) the last ticket to be sent. Returns: An object of an appropriate type suitable for transmission to the other side of the operation. """ raise NotImplementedError() @abc.abstractmethod def ticketize_abortion(self, operation_id, sequence_number, outcome): """Creates a ticket indicating that the operation is aborted. Args: operation_id: The operation ID for the current operation. sequence_number: A sequence number for the ticket. outcome: An interfaces.Outcome value describing the operation abortion. Returns: An object of an appropriate type suitable for transmission to the other side of the operation, or None if transmission is not appropriate for the given outcome. """ raise NotImplementedError() class _FrontTicketizer(_Ticketizer): """Front-side ticket-creating behavior.""" def __init__(self, name, subscription_kind, trace_id, timeout): """Constructor. Args: name: The name of the operation. subscription_kind: An interfaces.ServicedSubscription.Kind value describing the interest the front has in tickets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. """ self._name = name self._subscription_kind = subscription_kind self._trace_id = trace_id self._timeout = timeout def ticketize(self, operation_id, sequence_number, payload, complete): """See _Ticketizer.ticketize for specification.""" if sequence_number: if complete: kind = interfaces.FrontToBackTicket.Kind.COMPLETION else: kind = interfaces.FrontToBackTicket.Kind.CONTINUATION return interfaces.FrontToBackTicket( operation_id, sequence_number, kind, self._name, self._subscription_kind, self._trace_id, payload, self._timeout) else: if complete: kind = interfaces.FrontToBackTicket.Kind.ENTIRE else: kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT return interfaces.FrontToBackTicket( operation_id, 0, kind, self._name, self._subscription_kind, self._trace_id, payload, self._timeout) def ticketize_abortion(self, operation_id, sequence_number, outcome): """See _Ticketizer.ticketize_abortion for specification.""" if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES: return None else: kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome] return interfaces.FrontToBackTicket( operation_id, sequence_number, kind, None, None, None, None, None) class _BackTicketizer(_Ticketizer): """Back-side ticket-creating behavior.""" def ticketize(self, operation_id, sequence_number, payload, complete): """See _Ticketizer.ticketize for specification.""" if complete: kind = interfaces.BackToFrontTicket.Kind.COMPLETION else: kind = interfaces.BackToFrontTicket.Kind.CONTINUATION return interfaces.BackToFrontTicket( operation_id, sequence_number, kind, payload) def ticketize_abortion(self, operation_id, sequence_number, outcome): """See _Ticketizer.ticketize_abortion for specification.""" if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES: return None else: kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome] return interfaces.BackToFrontTicket( operation_id, sequence_number, kind, None) class TransmissionManager(six.with_metaclass(abc.ABCMeta, _interfaces.TransmissionManager)): """A _interfaces.TransmissionManager on which other managers may be set.""" @abc.abstractmethod def set_ingestion_and_expiration_managers( self, ingestion_manager, expiration_manager): """Sets two of the other managers with which this manager may interact. Args: ingestion_manager: The _interfaces.IngestionManager associated with the current operation. expiration_manager: The _interfaces.ExpirationManager associated with the current operation. """ raise NotImplementedError() class _EmptyTransmissionManager(TransmissionManager): """A completely no-operative _interfaces.TransmissionManager.""" def set_ingestion_and_expiration_managers( self, ingestion_manager, expiration_manager): """See overriden method for specification.""" def inmit(self, emission, complete): """See _interfaces.TransmissionManager.inmit for specification.""" def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" class _TransmittingTransmissionManager(TransmissionManager): """A TransmissionManager implementation that sends tickets.""" def __init__( self, lock, pool, callback, operation_id, ticketizer, termination_manager): """Constructor. Args: lock: The operation-servicing-wide lock object. pool: A thread pool in which the work of transmitting tickets will be performed. callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. ticketizer: A _Ticketizer for ticket creation. termination_manager: The _interfaces.TerminationManager associated with this operation. """ self._lock = lock self._pool = pool self._callback = callback self._operation_id = operation_id self._ticketizer = ticketizer self._termination_manager = termination_manager self._ingestion_manager = None self._expiration_manager = None self._emissions = [] self._emission_complete = False self._outcome = None self._lowest_unused_sequence_number = 0 self._transmitting = False def set_ingestion_and_expiration_managers( self, ingestion_manager, expiration_manager): """See overridden method for specification.""" self._ingestion_manager = ingestion_manager self._expiration_manager = expiration_manager def _lead_ticket(self, emission, complete): """Creates a ticket suitable for leading off the transmission loop. Args: emission: A customer payload object to be sent to the other side of the operation. complete: Whether or not the sequence of customer payloads ends with the passed object. Returns: A ticket with which to lead off the transmission loop. """ sequence_number = self._lowest_unused_sequence_number self._lowest_unused_sequence_number += 1 return self._ticketizer.ticketize( self._operation_id, sequence_number, emission, complete) def _abortive_response_ticket(self, outcome): """Creates a ticket indicating operation abortion. Args: outcome: An interfaces.Outcome value describing operation abortion. Returns: A ticket indicating operation abortion. """ ticket = self._ticketizer.ticketize_abortion( self._operation_id, self._lowest_unused_sequence_number, outcome) if ticket is None: return None else: self._lowest_unused_sequence_number += 1 return ticket def _next_ticket(self): """Creates the next ticket to be sent to the other side of the operation. Returns: A (completed, ticket) tuple comprised of a boolean indicating whether or not the sequence of tickets has completed normally and a ticket to send to the other side if the sequence of tickets hasn't completed. The tuple will never have both a True first element and a non-None second element. """ if self._emissions is None: return False, None elif self._outcome is None: if self._emissions: payload = self._emissions.pop(0) complete = self._emission_complete and not self._emissions sequence_number = self._lowest_unused_sequence_number self._lowest_unused_sequence_number += 1 return complete, self._ticketizer.ticketize( self._operation_id, sequence_number, payload, complete) else: return self._emission_complete, None else: ticket = self._abortive_response_ticket(self._outcome) self._emissions = None return False, None if ticket is None else ticket def _transmit(self, ticket): """Commences the transmission loop sending tickets. Args: ticket: A ticket to be sent to the other side of the operation. """ def transmit(ticket): while True: transmission_outcome = callable_util.call_logging_exceptions( self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) if transmission_outcome.exception is None: with self._lock: complete, ticket = self._next_ticket() if ticket is None: if complete: self._termination_manager.transmission_complete() self._transmitting = False return else: with self._lock: self._emissions = None self._termination_manager.abort( interfaces.Outcome.TRANSMISSION_FAILURE) self._ingestion_manager.abort() self._expiration_manager.abort() self._transmitting = False return self._pool.submit(callable_util.with_exceptions_logged( transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) self._transmitting = True def inmit(self, emission, complete): """See _interfaces.TransmissionManager.inmit for specification.""" if self._emissions is not None and self._outcome is None: self._emission_complete = complete if self._transmitting: self._emissions.append(emission) else: self._transmit(self._lead_ticket(emission, complete)) def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" if self._emissions is not None and self._outcome is None: self._outcome = outcome if not self._transmitting: ticket = self._abortive_response_ticket(outcome) self._emissions = None if ticket is not None: self._transmit(ticket) def front_transmission_manager( lock, pool, callback, operation_id, name, subscription_kind, trace_id, timeout, termination_manager): """Creates a TransmissionManager appropriate for front-side use. Args: lock: The operation-servicing-wide lock object. pool: A thread pool in which the work of transmitting tickets will be performed. callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. name: The name of the operation. subscription_kind: An interfaces.ServicedSubscription.Kind value describing the interest the front has in tickets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. termination_manager: The _interfaces.TerminationManager associated with this operation. Returns: A TransmissionManager appropriate for front-side use. """ return _TransmittingTransmissionManager( lock, pool, callback, operation_id, _FrontTicketizer( name, subscription_kind, trace_id, timeout), termination_manager) def back_transmission_manager( lock, pool, callback, operation_id, termination_manager, subscription_kind): """Creates a TransmissionManager appropriate for back-side use. Args: lock: The operation-servicing-wide lock object. pool: A thread pool in which the work of transmitting tickets will be performed. callback: A callable that accepts tickets and sends them to the other side of the operation. operation_id: The operation's ID. termination_manager: The _interfaces.TerminationManager associated with this operation. subscription_kind: An interfaces.ServicedSubscription.Kind value describing the interest the front has in tickets sent from the back. Returns: A TransmissionManager appropriate for back-side use. """ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE: return _EmptyTransmissionManager() else: return _TransmittingTransmissionManager( lock, pool, callback, operation_id, _BackTicketizer(), termination_manager)