aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/_framework/base/packets/_transmission.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/_framework/base/packets/_transmission.py')
-rw-r--r--src/python/_framework/base/packets/_transmission.py393
1 files changed, 393 insertions, 0 deletions
diff --git a/src/python/_framework/base/packets/_transmission.py b/src/python/_framework/base/packets/_transmission.py
new file mode 100644
index 0000000000..006128774d
--- /dev/null
+++ b/src/python/_framework/base/packets/_transmission.py
@@ -0,0 +1,393 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for packet transmission during an operation."""
+
+import abc
+
+from _framework.base import interfaces
+from _framework.base.packets import _constants
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+from _framework.foundation import callable_util
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = (
+ packets.Kind.SERVICER_FAILURE,
+ )
+_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = (
+ packets.Kind.CANCELLATION,
+ packets.Kind.SERVICED_FAILURE,
+ )
+
+
+class _Packetizer(object):
+ """Common specification of different packet-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """Creates a packet indicating ordinary operation progress.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the packet.
+ payload: A customer payload object. May be None if sequence_number is
+ zero or complete is true.
+ complete: A boolean indicating whether or not the packet should describe
+ itself as (but for a later indication of operation abortion) the last
+ packet to be sent.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """Creates a packet indicating that the operation is aborted.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the packet.
+ kind: One of the values of packets.Kind indicating operational abortion.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation, or None if transmission is not appropriate for
+ the given kind.
+ """
+ raise NotImplementedError()
+
+
+class _FrontPacketizer(_Packetizer):
+ """Front-side packet-creating behavior."""
+
+ def __init__(self, name, subscription, trace_id, timeout):
+ """Constructor.
+
+ Args:
+ name: The name of the operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent
+ from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ """
+ self._name = name
+ self._subscription = subscription
+ self._trace_id = trace_id
+ self._timeout = timeout
+
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """See _Packetizer.packetize for specification."""
+ if sequence_number:
+ return packets.FrontToBackPacket(
+ operation_id, sequence_number,
+ packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
+ self._name, self._subscription, self._trace_id, payload,
+ self._timeout)
+ else:
+ return packets.FrontToBackPacket(
+ operation_id, 0,
+ packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
+ self._name, self._subscription, self._trace_id, payload,
+ self._timeout)
+
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """See _Packetizer.packetize_abortion for specification."""
+ if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS:
+ return None
+ else:
+ return packets.FrontToBackPacket(
+ operation_id, sequence_number, kind, None, None, None, None, None)
+
+
+class _BackPacketizer(_Packetizer):
+ """Back-side packet-creating behavior."""
+
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """See _Packetizer.packetize for specification."""
+ return packets.BackToFrontPacket(
+ operation_id, sequence_number,
+ packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
+ payload)
+
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """See _Packetizer.packetize_abortion for specification."""
+ if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS:
+ return None
+ else:
+ return packets.BackToFrontPacket(
+ operation_id, sequence_number, kind, None)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """A _interfaces.TransmissionManager on which other managers may be set."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """Sets two of the other managers with which this manager may interact.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager associated with the
+ current operation.
+ expiration_manager: The _interfaces.ExpirationManager associated with the
+ current operation.
+ """
+ raise NotImplementedError()
+
+
+class _EmptyTransmissionManager(TransmissionManager):
+ """A completely no-operative _interfaces.TransmissionManager."""
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overriden method for specification."""
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+
+ def abort(self, category):
+ """See _interfaces.TransmissionManager.abort for specification."""
+
+
+class _TransmittingTransmissionManager(TransmissionManager):
+ """A TransmissionManager implementation that sends packets."""
+
+ def __init__(
+ self, lock, pool, callback, operation_id, packetizer,
+ termination_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ packetizer: A _Packetizer for packet creation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._callback = callback
+ self._operation_id = operation_id
+ self._packetizer = packetizer
+ self._termination_manager = termination_manager
+ self._ingestion_manager = None
+ self._expiration_manager = None
+
+ self._emissions = []
+ self._emission_complete = False
+ self._kind = None
+ self._lowest_unused_sequence_number = 0
+ self._transmitting = False
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overridden method for specification."""
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def _lead_packet(self, emission, complete):
+ """Creates a packet suitable for leading off the transmission loop.
+
+ Args:
+ emission: A customer payload object to be sent to the other side of the
+ operation.
+ complete: Whether or not the sequence of customer payloads ends with
+ the passed object.
+
+ Returns:
+ A packet with which to lead off the transmission loop.
+ """
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return self._packetizer.packetize(
+ self._operation_id, sequence_number, emission, complete)
+
+ def _abortive_response_packet(self, kind):
+ """Creates a packet indicating operation abortion.
+
+ Args:
+ kind: One of the values of packets.Kind indicating operational abortion.
+
+ Returns:
+ A packet indicating operation abortion.
+ """
+ packet = self._packetizer.packetize_abortion(
+ self._operation_id, self._lowest_unused_sequence_number, kind)
+ if packet is None:
+ return None
+ else:
+ self._lowest_unused_sequence_number += 1
+ return packet
+
+ def _next_packet(self):
+ """Creates the next packet to be sent to the other side of the operation.
+
+ Returns:
+ A (completed, packet) tuple comprised of a boolean indicating whether or
+ not the sequence of packets has completed normally and a packet to send
+ to the other side if the sequence of packets hasn't completed. The tuple
+ will never have both a True first element and a non-None second element.
+ """
+ if self._emissions is None:
+ return False, None
+ elif self._kind is None:
+ if self._emissions:
+ payload = self._emissions.pop(0)
+ complete = self._emission_complete and not self._emissions
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return complete, self._packetizer.packetize(
+ self._operation_id, sequence_number, payload, complete)
+ else:
+ return self._emission_complete, None
+ else:
+ packet = self._abortive_response_packet(self._kind)
+ self._emissions = None
+ return False, None if packet is None else packet
+
+ def _transmit(self, packet):
+ """Commences the transmission loop sending packets.
+
+ Args:
+ packet: A packet to be sent to the other side of the operation.
+ """
+ def transmit(packet):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ complete, packet = self._next_packet()
+ if packet is None:
+ if complete:
+ self._termination_manager.transmission_complete()
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ self._emissions = None
+ self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
+ self._transmitting = False
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet)
+ self._transmitting = True
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+ if self._emissions is not None and self._kind is None:
+ self._emission_complete = complete
+ if self._transmitting:
+ self._emissions.append(emission)
+ else:
+ self._transmit(self._lead_packet(emission, complete))
+
+ def abort(self, kind):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._emissions is not None and self._kind is None:
+ self._kind = kind
+ if not self._transmitting:
+ packet = self._abortive_response_packet(kind)
+ self._emissions = None
+ if packet is not None:
+ self._transmit(packet)
+
+
+def front_transmission_manager(
+ lock, pool, callback, operation_id, name, subscription, trace_id, timeout,
+ termination_manager):
+ """Creates a TransmissionManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ name: The name of the operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent
+ from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+
+ Returns:
+ A TransmissionManager appropriate for front-side use.
+ """
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _FrontPacketizer(
+ name, subscription, trace_id, timeout),
+ termination_manager)
+
+
+def back_transmission_manager(
+ lock, pool, callback, operation_id, termination_manager, subscription):
+ """Creates a TransmissionManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent from
+ the back.
+
+ Returns:
+ A TransmissionManager appropriate for back-side use.
+ """
+ if subscription == interfaces.NONE:
+ return _EmptyTransmissionManager()
+ else:
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _BackPacketizer(),
+ termination_manager)