aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/framework/base/_transmission.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/framework/base/_transmission.py')
-rw-r--r--src/python/grpcio/grpc/framework/base/_transmission.py429
1 files changed, 429 insertions, 0 deletions
diff --git a/src/python/grpcio/grpc/framework/base/_transmission.py b/src/python/grpcio/grpc/framework/base/_transmission.py
new file mode 100644
index 0000000000..6845129234
--- /dev/null
+++ b/src/python/grpcio/grpc/framework/base/_transmission.py
@@ -0,0 +1,429 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket transmission during an operation."""
+
+import abc
+
+from grpc.framework.base import _constants
+from grpc.framework.base import _interfaces
+from grpc.framework.base import interfaces
+from grpc.framework.foundation import callable_util
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+_FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = (
+ interfaces.Outcome.SERVICER_FAILURE,
+ )
+_BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = (
+ interfaces.Outcome.CANCELLED,
+ interfaces.Outcome.SERVICED_FAILURE,
+ )
+
+_ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = {
+ interfaces.Outcome.CANCELLED:
+ interfaces.FrontToBackTicket.Kind.CANCELLATION,
+ interfaces.Outcome.EXPIRED:
+ interfaces.FrontToBackTicket.Kind.EXPIRATION,
+ interfaces.Outcome.RECEPTION_FAILURE:
+ interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE,
+ interfaces.Outcome.TRANSMISSION_FAILURE:
+ interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE,
+ interfaces.Outcome.SERVICED_FAILURE:
+ interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE,
+ interfaces.Outcome.SERVICER_FAILURE:
+ interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE,
+}
+
+_ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = {
+ interfaces.Outcome.CANCELLED:
+ interfaces.BackToFrontTicket.Kind.CANCELLATION,
+ interfaces.Outcome.EXPIRED:
+ interfaces.BackToFrontTicket.Kind.EXPIRATION,
+ interfaces.Outcome.RECEPTION_FAILURE:
+ interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE,
+ interfaces.Outcome.TRANSMISSION_FAILURE:
+ interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE,
+ interfaces.Outcome.SERVICED_FAILURE:
+ interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE,
+ interfaces.Outcome.SERVICER_FAILURE:
+ interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE,
+}
+
+
+class _Ticketizer(object):
+ """Common specification of different ticket-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """Creates a ticket indicating ordinary operation progress.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the ticket.
+ payload: A customer payload object. May be None if sequence_number is
+ zero or complete is true.
+ complete: A boolean indicating whether or not the ticket should describe
+ itself as (but for a later indication of operation abortion) the last
+ ticket to be sent.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """Creates a ticket indicating that the operation is aborted.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the ticket.
+ outcome: An interfaces.Outcome value describing the operation abortion.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation, or None if transmission is not appropriate for
+ the given outcome.
+ """
+ raise NotImplementedError()
+
+
+class _FrontTicketizer(_Ticketizer):
+ """Front-side ticket-creating behavior."""
+
+ def __init__(self, name, subscription_kind, trace_id, timeout):
+ """Constructor.
+
+ Args:
+ name: The name of the operation.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in tickets sent from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ """
+ self._name = name
+ self._subscription_kind = subscription_kind
+ self._trace_id = trace_id
+ self._timeout = timeout
+
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """See _Ticketizer.ticketize for specification."""
+ if sequence_number:
+ if complete:
+ kind = interfaces.FrontToBackTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.FrontToBackTicket.Kind.CONTINUATION
+ return interfaces.FrontToBackTicket(
+ operation_id, sequence_number, kind, self._name,
+ self._subscription_kind, self._trace_id, payload, self._timeout)
+ else:
+ if complete:
+ kind = interfaces.FrontToBackTicket.Kind.ENTIRE
+ else:
+ kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT
+ return interfaces.FrontToBackTicket(
+ operation_id, 0, kind, self._name, self._subscription_kind,
+ self._trace_id, payload, self._timeout)
+
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """See _Ticketizer.ticketize_abortion for specification."""
+ if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
+ return None
+ else:
+ kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome]
+ return interfaces.FrontToBackTicket(
+ operation_id, sequence_number, kind, None, None, None, None, None)
+
+
+class _BackTicketizer(_Ticketizer):
+ """Back-side ticket-creating behavior."""
+
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """See _Ticketizer.ticketize for specification."""
+ if complete:
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
+ else:
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ return interfaces.BackToFrontTicket(
+ operation_id, sequence_number, kind, payload)
+
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """See _Ticketizer.ticketize_abortion for specification."""
+ if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
+ return None
+ else:
+ kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome]
+ return interfaces.BackToFrontTicket(
+ operation_id, sequence_number, kind, None)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """A _interfaces.TransmissionManager on which other managers may be set."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """Sets two of the other managers with which this manager may interact.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager associated with the
+ current operation.
+ expiration_manager: The _interfaces.ExpirationManager associated with the
+ current operation.
+ """
+ raise NotImplementedError()
+
+
+class _EmptyTransmissionManager(TransmissionManager):
+ """A completely no-operative _interfaces.TransmissionManager."""
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overriden method for specification."""
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+
+ def abort(self, outcome):
+ """See _interfaces.TransmissionManager.abort for specification."""
+
+
+class _TransmittingTransmissionManager(TransmissionManager):
+ """A TransmissionManager implementation that sends tickets."""
+
+ def __init__(
+ self, lock, pool, callback, operation_id, ticketizer,
+ termination_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ callback: A callable that accepts tickets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ ticketizer: A _Ticketizer for ticket creation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._callback = callback
+ self._operation_id = operation_id
+ self._ticketizer = ticketizer
+ self._termination_manager = termination_manager
+ self._ingestion_manager = None
+ self._expiration_manager = None
+
+ self._emissions = []
+ self._emission_complete = False
+ self._outcome = None
+ self._lowest_unused_sequence_number = 0
+ self._transmitting = False
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overridden method for specification."""
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def _lead_ticket(self, emission, complete):
+ """Creates a ticket suitable for leading off the transmission loop.
+
+ Args:
+ emission: A customer payload object to be sent to the other side of the
+ operation.
+ complete: Whether or not the sequence of customer payloads ends with
+ the passed object.
+
+ Returns:
+ A ticket with which to lead off the transmission loop.
+ """
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return self._ticketizer.ticketize(
+ self._operation_id, sequence_number, emission, complete)
+
+ def _abortive_response_ticket(self, outcome):
+ """Creates a ticket indicating operation abortion.
+
+ Args:
+ outcome: An interfaces.Outcome value describing operation abortion.
+
+ Returns:
+ A ticket indicating operation abortion.
+ """
+ ticket = self._ticketizer.ticketize_abortion(
+ self._operation_id, self._lowest_unused_sequence_number, outcome)
+ if ticket is None:
+ return None
+ else:
+ self._lowest_unused_sequence_number += 1
+ return ticket
+
+ def _next_ticket(self):
+ """Creates the next ticket to be sent to the other side of the operation.
+
+ Returns:
+ A (completed, ticket) tuple comprised of a boolean indicating whether or
+ not the sequence of tickets has completed normally and a ticket to send
+ to the other side if the sequence of tickets hasn't completed. The tuple
+ will never have both a True first element and a non-None second element.
+ """
+ if self._emissions is None:
+ return False, None
+ elif self._outcome is None:
+ if self._emissions:
+ payload = self._emissions.pop(0)
+ complete = self._emission_complete and not self._emissions
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return complete, self._ticketizer.ticketize(
+ self._operation_id, sequence_number, payload, complete)
+ else:
+ return self._emission_complete, None
+ else:
+ ticket = self._abortive_response_ticket(self._outcome)
+ self._emissions = None
+ return False, None if ticket is None else ticket
+
+ def _transmit(self, ticket):
+ """Commences the transmission loop sending tickets.
+
+ Args:
+ ticket: A ticket to be sent to the other side of the operation.
+ """
+ def transmit(ticket):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ complete, ticket = self._next_ticket()
+ if ticket is None:
+ if complete:
+ self._termination_manager.transmission_complete()
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ self._emissions = None
+ self._termination_manager.abort(
+ interfaces.Outcome.TRANSMISSION_FAILURE)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
+ self._transmitting = False
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
+ self._transmitting = True
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+ if self._emissions is not None and self._outcome is None:
+ self._emission_complete = complete
+ if self._transmitting:
+ self._emissions.append(emission)
+ else:
+ self._transmit(self._lead_ticket(emission, complete))
+
+ def abort(self, outcome):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._emissions is not None and self._outcome is None:
+ self._outcome = outcome
+ if not self._transmitting:
+ ticket = self._abortive_response_ticket(outcome)
+ self._emissions = None
+ if ticket is not None:
+ self._transmit(ticket)
+
+
+def front_transmission_manager(
+ lock, pool, callback, operation_id, name, subscription_kind, trace_id,
+ timeout, termination_manager):
+ """Creates a TransmissionManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ callback: A callable that accepts tickets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ name: The name of the operation.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in tickets sent from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+
+ Returns:
+ A TransmissionManager appropriate for front-side use.
+ """
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _FrontTicketizer(
+ name, subscription_kind, trace_id, timeout),
+ termination_manager)
+
+
+def back_transmission_manager(
+ lock, pool, callback, operation_id, termination_manager,
+ subscription_kind):
+ """Creates a TransmissionManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting tickets will be
+ performed.
+ callback: A callable that accepts tickets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in tickets sent from the back.
+
+ Returns:
+ A TransmissionManager appropriate for back-side use.
+ """
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
+ return _EmptyTransmissionManager()
+ else:
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _BackTicketizer(),
+ termination_manager)