aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/_framework/base/packets
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/_framework/base/packets')
-rw-r--r--src/python/_framework/base/packets/__init__.py0
-rw-r--r--src/python/_framework/base/packets/_cancellation.py64
-rw-r--r--src/python/_framework/base/packets/_constants.py32
-rw-r--r--src/python/_framework/base/packets/_context.py99
-rw-r--r--src/python/_framework/base/packets/_emission.py126
-rw-r--r--src/python/_framework/base/packets/_ends.py408
-rw-r--r--src/python/_framework/base/packets/_expiration.py158
-rw-r--r--src/python/_framework/base/packets/_ingestion.py440
-rw-r--r--src/python/_framework/base/packets/_interfaces.py269
-rw-r--r--src/python/_framework/base/packets/_reception.py394
-rw-r--r--src/python/_framework/base/packets/_termination.py201
-rw-r--r--src/python/_framework/base/packets/_transmission.py393
-rw-r--r--src/python/_framework/base/packets/implementations.py77
-rw-r--r--src/python/_framework/base/packets/implementations_test.py80
-rw-r--r--src/python/_framework/base/packets/in_memory.py108
-rw-r--r--src/python/_framework/base/packets/interfaces.py84
-rw-r--r--src/python/_framework/base/packets/null.py56
-rw-r--r--src/python/_framework/base/packets/packets.py112
18 files changed, 3101 insertions, 0 deletions
diff --git a/src/python/_framework/base/packets/__init__.py b/src/python/_framework/base/packets/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/src/python/_framework/base/packets/__init__.py
diff --git a/src/python/_framework/base/packets/_cancellation.py b/src/python/_framework/base/packets/_cancellation.py
new file mode 100644
index 0000000000..49172d1b97
--- /dev/null
+++ b/src/python/_framework/base/packets/_cancellation.py
@@ -0,0 +1,64 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation cancellation."""
+
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+
+
+class CancellationManager(_interfaces.CancellationManager):
+ """An implementation of _interfaces.CancellationManager."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def cancel(self):
+ """See _interfaces.CancellationManager.cancel for specification."""
+ with self._lock:
+ self._termination_manager.abort(packets.Kind.CANCELLATION)
+ self._transmission_manager.abort(packets.Kind.CANCELLATION)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
diff --git a/src/python/_framework/base/packets/_constants.py b/src/python/_framework/base/packets/_constants.py
new file mode 100644
index 0000000000..8fbdc82782
--- /dev/null
+++ b/src/python/_framework/base/packets/_constants.py
@@ -0,0 +1,32 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Private constants for the package."""
+
+INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Base) internal error! :-('
diff --git a/src/python/_framework/base/packets/_context.py b/src/python/_framework/base/packets/_context.py
new file mode 100644
index 0000000000..be390364b0
--- /dev/null
+++ b/src/python/_framework/base/packets/_context.py
@@ -0,0 +1,99 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation context."""
+
+import time
+
+# _interfaces and packets are referenced from specification in this module.
+from _framework.base import interfaces as base_interfaces
+from _framework.base.packets import _interfaces # pylint: disable=unused-import
+from _framework.base.packets import packets # pylint: disable=unused-import
+
+
+class OperationContext(base_interfaces.OperationContext):
+ """An implementation of base_interfaces.OperationContext."""
+
+ def __init__(
+ self, lock, operation_id, local_failure, termination_manager,
+ transmission_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ operation_id: An object identifying the operation.
+ local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or
+ packets.Kind.SERVICER_FAILURE describes local failure of customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ """
+ self._lock = lock
+ self._local_failure = local_failure
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = None
+ self._expiration_manager = None
+
+ self.operation_id = operation_id
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """Sets managers with which this OperationContext cooperates.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ expiration_manager: The _interfaces.ExpirationManager for the operation.
+ """
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def is_active(self):
+ """See base_interfaces.OperationContext.is_active for specification."""
+ with self._lock:
+ return self._termination_manager.is_active()
+
+ def add_termination_callback(self, callback):
+ """See base_interfaces.OperationContext.add_termination_callback."""
+ with self._lock:
+ self._termination_manager.add_callback(callback)
+
+ def time_remaining(self):
+ """See interfaces.OperationContext.time_remaining for specification."""
+ with self._lock:
+ deadline = self._expiration_manager.deadline()
+ return max(0.0, deadline - time.time())
+
+ def fail(self, exception):
+ """See interfaces.OperationContext.fail for specification."""
+ with self._lock:
+ self._termination_manager.abort(self._local_failure)
+ self._transmission_manager.abort(self._local_failure)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
diff --git a/src/python/_framework/base/packets/_emission.py b/src/python/_framework/base/packets/_emission.py
new file mode 100644
index 0000000000..b4be5eb0ff
--- /dev/null
+++ b/src/python/_framework/base/packets/_emission.py
@@ -0,0 +1,126 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for handling emitted values."""
+
+# packets is referenced from specifications in this module.
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets # pylint: disable=unused-import
+
+
+class _EmissionManager(_interfaces.EmissionManager):
+ """An implementation of _interfaces.EmissionManager."""
+
+ def __init__(
+ self, lock, failure_kind, termination_manager, transmission_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
+ packets.Kind.SERVICER_FAILURE describes this object's methods being
+ called inappropriately by customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ """
+ self._lock = lock
+ self._failure_kind = failure_kind
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = None
+ self._expiration_manager = None
+
+ self._emission_complete = False
+
+ def set_ingestion_manager_and_expiration_manager(
+ self, ingestion_manager, expiration_manager):
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def _abort(self):
+ self._termination_manager.abort(self._failure_kind)
+ self._transmission_manager.abort(self._failure_kind)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
+
+ def consume(self, value):
+ with self._lock:
+ if self._emission_complete:
+ self._abort()
+ else:
+ self._transmission_manager.inmit(value, False)
+
+ def terminate(self):
+ with self._lock:
+ if not self._emission_complete:
+ self._termination_manager.emission_complete()
+ self._transmission_manager.inmit(None, True)
+ self._emission_complete = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._emission_complete:
+ self._abort()
+ else:
+ self._termination_manager.emission_complete()
+ self._transmission_manager.inmit(value, True)
+ self._emission_complete = True
+
+
+def front_emission_manager(lock, termination_manager, transmission_manager):
+ """Creates an _interfaces.EmissionManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the operation.
+
+ Returns:
+ An _interfaces.EmissionManager appropriate for front-side use.
+ """
+ return _EmissionManager(
+ lock, packets.Kind.SERVICED_FAILURE, termination_manager,
+ transmission_manager)
+
+
+def back_emission_manager(lock, termination_manager, transmission_manager):
+ """Creates an _interfaces.EmissionManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the operation.
+
+ Returns:
+ An _interfaces.EmissionManager appropriate for back-side use.
+ """
+ return _EmissionManager(
+ lock, packets.Kind.SERVICER_FAILURE, termination_manager,
+ transmission_manager)
diff --git a/src/python/_framework/base/packets/_ends.py b/src/python/_framework/base/packets/_ends.py
new file mode 100644
index 0000000000..baaf5cacf9
--- /dev/null
+++ b/src/python/_framework/base/packets/_ends.py
@@ -0,0 +1,408 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Implementations of Fronts and Backs."""
+
+import collections
+import threading
+import uuid
+
+# _interfaces and packets are referenced from specification in this module.
+from _framework.base import interfaces as base_interfaces
+from _framework.base.packets import _cancellation
+from _framework.base.packets import _context
+from _framework.base.packets import _emission
+from _framework.base.packets import _expiration
+from _framework.base.packets import _ingestion
+from _framework.base.packets import _interfaces # pylint: disable=unused-import
+from _framework.base.packets import _reception
+from _framework.base.packets import _termination
+from _framework.base.packets import _transmission
+from _framework.base.packets import interfaces
+from _framework.base.packets import packets # pylint: disable=unused-import
+from _framework.foundation import callable_util
+
+_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
+
+_OPERATION_OUTCOMES = (
+ base_interfaces.COMPLETED,
+ base_interfaces.CANCELLED,
+ base_interfaces.EXPIRED,
+ base_interfaces.RECEPTION_FAILURE,
+ base_interfaces.TRANSMISSION_FAILURE,
+ base_interfaces.SERVICER_FAILURE,
+ base_interfaces.SERVICED_FAILURE,
+ )
+
+
+class _EasyOperation(base_interfaces.Operation):
+ """A trivial implementation of base_interfaces.Operation."""
+
+ def __init__(self, emission_manager, context, cancellation_manager):
+ """Constructor.
+
+ Args:
+ emission_manager: The _interfaces.EmissionManager for the operation that
+ will accept values emitted by customer code.
+ context: The base_interfaces.OperationContext for use by the customer
+ during the operation.
+ cancellation_manager: The _interfaces.CancellationManager for the
+ operation.
+ """
+ self.consumer = emission_manager
+ self.context = context
+ self._cancellation_manager = cancellation_manager
+
+ def cancel(self):
+ self._cancellation_manager.cancel()
+
+
+class _Endlette(object):
+ """Utility for stateful behavior common to Fronts and Backs."""
+
+ def __init__(self, pool):
+ """Constructor.
+
+ Args:
+ pool: A thread pool to use when calling registered idle actions.
+ """
+ self._lock = threading.Lock()
+ self._pool = pool
+ # Dictionary from operation IDs to ReceptionManager-or-None. A None value
+ # indicates an in-progress fire-and-forget operation for which the customer
+ # has chosen to ignore results.
+ self._operations = {}
+ self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES}
+ self._idle_actions = []
+
+ def terminal_action(self, operation_id):
+ """Constructs the termination action for a single operation.
+
+ Args:
+ operation_id: An operation ID.
+
+ Returns:
+ A callable that takes an operation outcome for an argument to be used as
+ the termination action for the operation associated with the given
+ operation ID.
+ """
+ def termination_action(outcome):
+ with self._lock:
+ self._stats[outcome] += 1
+ self._operations.pop(operation_id, None)
+ if not self._operations:
+ for action in self._idle_actions:
+ self._pool.submit(callable_util.with_exceptions_logged(
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE))
+ self._idle_actions = []
+ return termination_action
+
+ def __enter__(self):
+ self._lock.acquire()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._lock.release()
+
+ def get_operation(self, operation_id):
+ return self._operations.get(operation_id, None)
+
+ def add_operation(self, operation_id, operation_reception_manager):
+ self._operations[operation_id] = operation_reception_manager
+
+ def operation_stats(self):
+ with self._lock:
+ return dict(self._stats)
+
+ def add_idle_action(self, action):
+ with self._lock:
+ if self._operations:
+ self._idle_actions.append(action)
+ else:
+ self._pool.submit(callable_util.with_exceptions_logged(
+ action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE))
+
+
+class _FrontManagement(
+ collections.namedtuple(
+ '_FrontManagement',
+ ('reception', 'emission', 'operation', 'cancellation'))):
+ """Just a trivial helper class to bundle four fellow-traveling objects."""
+
+
+def _front_operate(
+ callback, work_pool, transmission_pool, utility_pool,
+ termination_action, operation_id, name, payload, complete, timeout,
+ subscription, trace_id):
+ """Constructs objects necessary for front-side operation management.
+
+ Args:
+ callback: A callable that accepts packets.FrontToBackPackets and delivers
+ them to the other side of the operation. Execution of this callable may
+ take any arbitrary length of time.
+ work_pool: A thread pool in which to execute customer code.
+ transmission_pool: A thread pool to use for transmitting to the other side
+ of the operation.
+ utility_pool: A thread pool for utility tasks.
+ termination_action: A no-arg behavior to be called upon operation
+ completion.
+ operation_id: An object identifying the operation.
+ name: The name of the method being called during the operation.
+ payload: The first customer-significant value to be transmitted to the other
+ side. May be None if there is no such value or if the customer chose not
+ to pass it at operation invocation.
+ complete: A boolean indicating whether or not additional payloads will be
+ supplied by the customer.
+ timeout: A length of time in seconds to allow for the operation.
+ subscription: A base_interfaces.ServicedSubscription describing the
+ customer's interest in the results of the operation.
+ trace_id: A uuid.UUID identifying a set of related operations to which this
+ operation belongs. May be None.
+
+ Returns:
+ A _FrontManagement object bundling together the
+ _interfaces.ReceptionManager, _interfaces.EmissionManager,
+ _context.OperationContext, and _interfaces.CancellationManager for the
+ operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.front_termination_manager(
+ work_pool, utility_pool, termination_action, subscription.category)
+ transmission_manager = _transmission.front_transmission_manager(
+ lock, transmission_pool, callback, operation_id, name,
+ subscription.category, trace_id, timeout, termination_manager)
+ operation_context = _context.OperationContext(
+ lock, operation_id, packets.Kind.SERVICED_FAILURE,
+ termination_manager, transmission_manager)
+ emission_manager = _emission.front_emission_manager(
+ lock, termination_manager, transmission_manager)
+ ingestion_manager = _ingestion.front_ingestion_manager(
+ lock, work_pool, subscription, termination_manager,
+ transmission_manager, operation_context)
+ expiration_manager = _expiration.front_expiration_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ timeout)
+ reception_manager = _reception.front_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager)
+ cancellation_manager = _cancellation.CancellationManager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager)
+
+ transmission_manager.set_ingestion_and_expiration_managers(
+ ingestion_manager, expiration_manager)
+ operation_context.set_ingestion_and_expiration_managers(
+ ingestion_manager, expiration_manager)
+ emission_manager.set_ingestion_manager_and_expiration_manager(
+ ingestion_manager, expiration_manager)
+ ingestion_manager.set_expiration_manager(expiration_manager)
+
+ transmission_manager.inmit(payload, complete)
+
+ returned_reception_manager = (
+ None if subscription.category == base_interfaces.NONE
+ else reception_manager)
+
+ return _FrontManagement(
+ returned_reception_manager, emission_manager, operation_context,
+ cancellation_manager)
+
+
+class Front(interfaces.Front):
+ """An implementation of interfaces.Front."""
+
+ def __init__(self, work_pool, transmission_pool, utility_pool):
+ """Constructor.
+
+ Args:
+ work_pool: A thread pool to be used for executing customer code.
+ transmission_pool: A thread pool to be used for transmitting values to
+ the other side of the operation.
+ utility_pool: A thread pool to be used for utility tasks.
+ """
+ self._endlette = _Endlette(utility_pool)
+ self._work_pool = work_pool
+ self._transmission_pool = transmission_pool
+ self._utility_pool = utility_pool
+ self._callback = None
+
+ self._operations = {}
+
+ def join_rear_link(self, rear_link):
+ """See interfaces.ForeLink.join_rear_link for specification."""
+ with self._endlette:
+ self._callback = rear_link.accept_front_to_back_ticket
+
+ def operation_stats(self):
+ """See base_interfaces.End.operation_stats for specification."""
+ return self._endlette.operation_stats()
+
+ def add_idle_action(self, action):
+ """See base_interfaces.End.add_idle_action for specification."""
+ self._endlette.add_idle_action(action)
+
+ def operate(
+ self, name, payload, complete, timeout, subscription, trace_id):
+ """See base_interfaces.Front.operate for specification."""
+ operation_id = uuid.uuid4()
+ with self._endlette:
+ management = _front_operate(
+ self._callback, self._work_pool, self._transmission_pool,
+ self._utility_pool, self._endlette.terminal_action(operation_id),
+ operation_id, name, payload, complete, timeout, subscription,
+ trace_id)
+ self._endlette.add_operation(operation_id, management.reception)
+ return _EasyOperation(
+ management.emission, management.operation, management.cancellation)
+
+ def accept_back_to_front_ticket(self, ticket):
+ """See interfaces.End.act for specification."""
+ with self._endlette:
+ reception_manager = self._endlette.get_operation(ticket.operation_id)
+ if reception_manager:
+ reception_manager.receive_packet(ticket)
+
+
+def _back_operate(
+ servicer, callback, work_pool, transmission_pool, utility_pool,
+ termination_action, ticket, default_timeout, maximum_timeout):
+ """Constructs objects necessary for back-side operation management.
+
+ Also begins back-side operation by feeding the first received ticket into the
+ constructed _interfaces.ReceptionManager.
+
+ Args:
+ servicer: An interfaces.Servicer for servicing operations.
+ callback: A callable that accepts packets.BackToFrontPackets and delivers
+ them to the other side of the operation. Execution of this callable may
+ take any arbitrary length of time.
+ work_pool: A thread pool in which to execute customer code.
+ transmission_pool: A thread pool to use for transmitting to the other side
+ of the operation.
+ utility_pool: A thread pool for utility tasks.
+ termination_action: A no-arg behavior to be called upon operation
+ completion.
+ ticket: The first packets.FrontToBackPacket received for the operation.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ The _interfaces.ReceptionManager to be used for the operation.
+ """
+ lock = threading.Lock()
+ with lock:
+ termination_manager = _termination.back_termination_manager(
+ work_pool, utility_pool, termination_action, ticket.subscription)
+ transmission_manager = _transmission.back_transmission_manager(
+ lock, transmission_pool, callback, ticket.operation_id,
+ termination_manager, ticket.subscription)
+ operation_context = _context.OperationContext(
+ lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE,
+ termination_manager, transmission_manager)
+ emission_manager = _emission.back_emission_manager(
+ lock, termination_manager, transmission_manager)
+ ingestion_manager = _ingestion.back_ingestion_manager(
+ lock, work_pool, servicer, termination_manager,
+ transmission_manager, operation_context, emission_manager)
+ expiration_manager = _expiration.back_expiration_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ ticket.timeout, default_timeout, maximum_timeout)
+ reception_manager = _reception.back_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager)
+
+ transmission_manager.set_ingestion_and_expiration_managers(
+ ingestion_manager, expiration_manager)
+ operation_context.set_ingestion_and_expiration_managers(
+ ingestion_manager, expiration_manager)
+ emission_manager.set_ingestion_manager_and_expiration_manager(
+ ingestion_manager, expiration_manager)
+ ingestion_manager.set_expiration_manager(expiration_manager)
+
+ reception_manager.receive_packet(ticket)
+
+ return reception_manager
+
+
+class Back(interfaces.Back):
+ """An implementation of interfaces.Back."""
+
+ def __init__(
+ self, servicer, work_pool, transmission_pool, utility_pool,
+ default_timeout, maximum_timeout):
+ """Constructor.
+
+ Args:
+ servicer: An interfaces.Servicer for servicing operations.
+ work_pool: A thread pool in which to execute customer code.
+ transmission_pool: A thread pool to use for transmitting to the other side
+ of the operation.
+ utility_pool: A thread pool for utility tasks.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+ """
+ self._endlette = _Endlette(utility_pool)
+ self._servicer = servicer
+ self._work_pool = work_pool
+ self._transmission_pool = transmission_pool
+ self._utility_pool = utility_pool
+ self._default_timeout = default_timeout
+ self._maximum_timeout = maximum_timeout
+ self._callback = None
+
+ def join_fore_link(self, fore_link):
+ """See interfaces.RearLink.join_fore_link for specification."""
+ with self._endlette:
+ self._callback = fore_link.accept_back_to_front_ticket
+
+ def accept_front_to_back_ticket(self, ticket):
+ """See interfaces.RearLink.accept_front_to_back_ticket for specification."""
+ with self._endlette:
+ reception_manager = self._endlette.get_operation(ticket.operation_id)
+ if reception_manager is None:
+ reception_manager = _back_operate(
+ self._servicer, self._callback, self._work_pool,
+ self._transmission_pool, self._utility_pool,
+ self._endlette.terminal_action(ticket.operation_id), ticket,
+ self._default_timeout, self._maximum_timeout)
+ self._endlette.add_operation(ticket.operation_id, reception_manager)
+ else:
+ reception_manager.receive_packet(ticket)
+
+ def operation_stats(self):
+ """See base_interfaces.End.operation_stats for specification."""
+ return self._endlette.operation_stats()
+
+ def add_idle_action(self, action):
+ """See base_interfaces.End.add_idle_action for specification."""
+ self._endlette.add_idle_action(action)
diff --git a/src/python/_framework/base/packets/_expiration.py b/src/python/_framework/base/packets/_expiration.py
new file mode 100644
index 0000000000..772e15f08c
--- /dev/null
+++ b/src/python/_framework/base/packets/_expiration.py
@@ -0,0 +1,158 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation expiration."""
+
+import time
+
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+from _framework.foundation import later
+
+
+class _ExpirationManager(_interfaces.ExpirationManager):
+ """An implementation of _interfaces.ExpirationManager."""
+
+ def __init__(
+ self, lock, termination_manager, transmission_manager, ingestion_manager,
+ commencement, timeout, maximum_timeout):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ commencement: The time in seconds since the epoch at which the operation
+ began.
+ timeout: A length of time in seconds to allow for the operation to run.
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run despite what is requested via this object's
+ change_timout method.
+ """
+ self._lock = lock
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._commencement = commencement
+ self._maximum_timeout = maximum_timeout
+
+ self._timeout = timeout
+ self._deadline = commencement + timeout
+ self._index = None
+ self._future = None
+
+ def _expire(self, index):
+ with self._lock:
+ if self._future is not None and index == self._index:
+ self._future = None
+ self._termination_manager.abort(packets.Kind.EXPIRATION)
+ self._transmission_manager.abort(packets.Kind.EXPIRATION)
+ self._ingestion_manager.abort()
+
+ def start(self):
+ self._index = 0
+ self._future = later.later(self._timeout, lambda: self._expire(0))
+
+ def change_timeout(self, timeout):
+ if self._future is not None and timeout != self._timeout:
+ self._future.cancel()
+ new_timeout = min(timeout, self._maximum_timeout)
+ new_index = self._index + 1
+ self._timeout = new_timeout
+ self._deadline = self._commencement + new_timeout
+ self._index = new_index
+ delay = self._deadline - time.time()
+ self._future = later.later(
+ delay, lambda: self._expire(new_index))
+
+ def deadline(self):
+ return self._deadline
+
+ def abort(self):
+ if self._future:
+ self._future.cancel()
+ self._future = None
+ self._deadline_index = None
+
+
+def front_expiration_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ timeout):
+ """Creates an _interfaces.ExpirationManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ timeout: A length of time in seconds to allow for the operation to run.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for front-side use.
+ """
+ commencement = time.time()
+ expiration_manager = _ExpirationManager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ commencement, timeout, timeout)
+ expiration_manager.start()
+ return expiration_manager
+
+
+def back_expiration_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ timeout, default_timeout, maximum_timeout):
+ """Creates an _interfaces.ExpirationManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ ingestion_manager: The _interfaces.IngestionManager for the operation.
+ timeout: A length of time in seconds to allow for the operation to run. May
+ be None in which case default_timeout will be used.
+ default_timeout: The default length of time in seconds to allow for the
+ operation to run if the front-side customer has not specified such a value
+ (or if the value they specified is not yet known).
+ maximum_timeout: The maximum length of time in seconds to allow for the
+ operation to run.
+
+ Returns:
+ An _interfaces.ExpirationManager appropriate for back-side use.
+ """
+ commencement = time.time()
+ expiration_manager = _ExpirationManager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ commencement, default_timeout if timeout is None else timeout,
+ maximum_timeout)
+ expiration_manager.start()
+ return expiration_manager
diff --git a/src/python/_framework/base/packets/_ingestion.py b/src/python/_framework/base/packets/_ingestion.py
new file mode 100644
index 0000000000..ad5ed4cada
--- /dev/null
+++ b/src/python/_framework/base/packets/_ingestion.py
@@ -0,0 +1,440 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ingestion during an operation."""
+
+import abc
+import collections
+
+from _framework.base import exceptions
+from _framework.base import interfaces
+from _framework.base.packets import _constants
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+from _framework.foundation import abandonment
+from _framework.foundation import callable_util
+from _framework.foundation import stream
+
+_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
+_CONSUME_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
+
+
+class _ConsumerCreation(collections.namedtuple(
+ '_ConsumerCreation', ('consumer', 'remote_error', 'abandoned'))):
+ """A sum type for the outcome of ingestion initialization.
+
+ Either consumer will be non-None, remote_error will be True, or abandoned will
+ be True.
+
+ Attributes:
+ consumer: A stream.Consumer for ingesting payloads.
+ remote_error: A boolean indicating that the consumer could not be created
+ due to an error on the remote side of the operation.
+ abandoned: A boolean indicating that the consumer creation was abandoned.
+ """
+
+
+class _EmptyConsumer(stream.Consumer):
+ """A no-operative stream.Consumer that ignores all inputs and calls."""
+
+ def consume(self, value):
+ """See stream.Consumer.consume for specification."""
+
+ def terminate(self):
+ """See stream.Consumer.terminate for specification."""
+
+ def consume_and_terminate(self, value):
+ """See stream.Consumer.consume_and_terminate for specification."""
+
+
+class _ConsumerCreator(object):
+ """Common specification of different consumer-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def create_consumer(self, requirement):
+ """Creates the stream.Consumer to which customer payloads will be delivered.
+
+ Any exceptions raised by this method should be attributed to and treated as
+ defects in the serviced or servicer code called by this method.
+
+ Args:
+ requirement: A value required by this _ConsumerCreator for consumer
+ creation.
+
+ Returns:
+ A _ConsumerCreation describing the result of consumer creation.
+ """
+ raise NotImplementedError()
+
+
+class _FrontConsumerCreator(_ConsumerCreator):
+ """A _ConsumerCreator appropriate for front-side use."""
+
+ def __init__(self, subscription, operation_context):
+ """Constructor.
+
+ Args:
+ subscription: The serviced's interfaces.ServicedSubscription for the
+ operation.
+ operation_context: The interfaces.OperationContext object for the
+ operation.
+ """
+ self._subscription = subscription
+ self._operation_context = operation_context
+
+ def create_consumer(self, requirement):
+ """See _ConsumerCreator.create_consumer for specification."""
+ if self._subscription.category == interfaces.FULL:
+ try:
+ return _ConsumerCreation(
+ self._subscription.ingestor.consumer(self._operation_context),
+ False, False)
+ except abandonment.Abandoned:
+ return _ConsumerCreation(None, False, True)
+ else:
+ return _ConsumerCreation(_EmptyConsumer(), False, False)
+
+
+class _BackConsumerCreator(_ConsumerCreator):
+ """A _ConsumerCreator appropriate for back-side use."""
+
+ def __init__(self, servicer, operation_context, emission_consumer):
+ """Constructor.
+
+ Args:
+ servicer: The interfaces.Servicer that will service the operation.
+ operation_context: The interfaces.OperationContext object for the
+ operation.
+ emission_consumer: The stream.Consumer object to which payloads emitted
+ from the operation will be passed.
+ """
+ self._servicer = servicer
+ self._operation_context = operation_context
+ self._emission_consumer = emission_consumer
+
+ def create_consumer(self, requirement):
+ """See _ConsumerCreator.create_consumer for full specification.
+
+ Args:
+ requirement: The name of the Servicer method to be called during this
+ operation.
+
+ Returns:
+ A _ConsumerCreation describing the result of consumer creation.
+ """
+ try:
+ return _ConsumerCreation(
+ self._servicer.service(
+ requirement, self._operation_context, self._emission_consumer),
+ False, False)
+ except exceptions.NoSuchMethodError:
+ return _ConsumerCreation(None, True, False)
+ except abandonment.Abandoned:
+ return _ConsumerCreation(None, False, True)
+
+
+class _WrappedConsumer(object):
+ """Wraps a consumer to catch the exceptions that it is allowed to throw."""
+
+ def __init__(self, consumer):
+ """Constructor.
+
+ Args:
+ consumer: A stream.Consumer that may raise abandonment.Abandoned from any
+ of its methods.
+ """
+ self._consumer = consumer
+
+ def moar(self, payload, complete):
+ """Makes progress with the wrapped consumer.
+
+ This method catches all exceptions allowed to be thrown by the wrapped
+ consumer. Any exceptions raised by this method should be blamed on the
+ customer-supplied consumer.
+
+ Args:
+ payload: A customer-significant payload object. May be None only if
+ complete is True.
+ complete: Whether or not the end of the payload sequence has been reached.
+ May be False only if payload is not None.
+
+ Returns:
+ True if the wrapped consumer made progress or False if the wrapped
+ consumer raised abandonment.Abandoned to indicate its abandonment of
+ progress.
+ """
+ try:
+ if payload:
+ if complete:
+ self._consumer.consume_and_terminate(payload)
+ else:
+ self._consumer.consume(payload)
+ else:
+ self._consumer.terminate()
+ return True
+ except abandonment.Abandoned:
+ return False
+
+
+class _IngestionManager(_interfaces.IngestionManager):
+ """An implementation of _interfaces.IngestionManager."""
+
+ def __init__(
+ self, lock, pool, consumer_creator, failure_kind, termination_manager,
+ transmission_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ consumer_creator: A _ConsumerCreator wrapping the portion of customer code
+ that when called returns the stream.Consumer with which the customer
+ code will ingest payload values.
+ failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
+ packets.Kind.SERVICER_FAILURE describes local failure of customer code.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._consumer_creator = consumer_creator
+ self._failure_kind = failure_kind
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._expiration_manager = None
+
+ self._wrapped_ingestion_consumer = None
+ self._pending_ingestion = []
+ self._ingestion_complete = False
+ self._processing = False
+
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
+ def _abort_internal_only(self):
+ self._wrapped_ingestion_consumer = None
+ self._pending_ingestion = None
+
+ def _abort_and_notify(self, outcome):
+ self._abort_internal_only()
+ self._termination_manager.abort(outcome)
+ self._transmission_manager.abort(outcome)
+ self._expiration_manager.abort()
+
+ def _next(self):
+ """Computes the next step for ingestion.
+
+ Returns:
+ A payload, complete, continue triplet indicating what payload (if any) is
+ available to feed into customer code, whether or not the sequence of
+ payloads has terminated, and whether or not there is anything
+ immediately actionable to call customer code to do.
+ """
+ if self._pending_ingestion is None:
+ return None, False, False
+ elif self._pending_ingestion:
+ payload = self._pending_ingestion.pop(0)
+ complete = self._ingestion_complete and not self._pending_ingestion
+ return payload, complete, True
+ elif self._ingestion_complete:
+ return None, True, True
+ else:
+ return None, False, False
+
+ def _process(self, wrapped_ingestion_consumer, payload, complete):
+ """A method to call to execute customer code.
+
+ This object's lock must *not* be held when calling this method.
+
+ Args:
+ wrapped_ingestion_consumer: The _WrappedConsumer with which to pass
+ payloads to customer code.
+ payload: A customer payload. May be None only if complete is True.
+ complete: Whether or not the sequence of payloads to pass to the customer
+ has concluded.
+ """
+ while True:
+ consumption_outcome = callable_util.call_logging_exceptions(
+ wrapped_ingestion_consumer.moar, _CONSUME_EXCEPTION_LOG_MESSAGE,
+ payload, complete)
+ if consumption_outcome.exception is None:
+ if consumption_outcome.return_value:
+ with self._lock:
+ if complete:
+ self._pending_ingestion = None
+ self._termination_manager.ingestion_complete()
+ return
+ else:
+ payload, complete, moar = self._next()
+ if not moar:
+ self._processing = False
+ return
+ else:
+ with self._lock:
+ if self._pending_ingestion is not None:
+ self._abort_and_notify(self._failure_kind)
+ self._processing = False
+ return
+ else:
+ with self._lock:
+ self._abort_and_notify(self._failure_kind)
+ self._processing = False
+ return
+
+ def start(self, requirement):
+ if self._pending_ingestion is not None:
+ def initialize():
+ consumer_creation_outcome = callable_util.call_logging_exceptions(
+ self._consumer_creator.create_consumer,
+ _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement)
+ if consumer_creation_outcome.return_value is None:
+ with self._lock:
+ self._abort_and_notify(self._failure_kind)
+ self._processing = False
+ elif consumer_creation_outcome.return_value.remote_error:
+ with self._lock:
+ self._abort_and_notify(packets.Kind.RECEPTION_FAILURE)
+ self._processing = False
+ elif consumer_creation_outcome.return_value.abandoned:
+ with self._lock:
+ if self._pending_ingestion is not None:
+ self._abort_and_notify(self._failure_kind)
+ self._processing = False
+ else:
+ wrapped_ingestion_consumer = _WrappedConsumer(
+ consumer_creation_outcome.return_value.consumer)
+ with self._lock:
+ self._wrapped_ingestion_consumer = wrapped_ingestion_consumer
+ payload, complete, moar = self._next()
+ if not moar:
+ self._processing = False
+ return
+
+ self._process(wrapped_ingestion_consumer, payload, complete)
+
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ initialize, _constants.INTERNAL_ERROR_LOG_MESSAGE))
+ self._processing = True
+
+ def consume(self, payload):
+ if self._ingestion_complete:
+ self._abort_and_notify(self._failure_kind)
+ elif self._pending_ingestion is not None:
+ if self._processing:
+ self._pending_ingestion.append(payload)
+ else:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._wrapped_ingestion_consumer, payload, False)
+ self._processing = True
+
+ def terminate(self):
+ if self._ingestion_complete:
+ self._abort_and_notify(self._failure_kind)
+ else:
+ self._ingestion_complete = True
+ if self._pending_ingestion is not None and not self._processing:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._wrapped_ingestion_consumer, None, True)
+ self._processing = True
+
+ def consume_and_terminate(self, payload):
+ if self._ingestion_complete:
+ self._abort_and_notify(self._failure_kind)
+ else:
+ self._ingestion_complete = True
+ if self._pending_ingestion is not None:
+ if self._processing:
+ self._pending_ingestion.append(payload)
+ else:
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._wrapped_ingestion_consumer, payload, True)
+ self._processing = True
+
+ def abort(self):
+ """See _interfaces.IngestionManager.abort for specification."""
+ self._abort_internal_only()
+
+
+def front_ingestion_manager(
+ lock, pool, subscription, termination_manager, transmission_manager,
+ operation_context):
+ """Creates an IngestionManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ subscription: A base_interfaces.ServicedSubscription indicating the
+ customer's interest in the results of the operation.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ operation_context: A base_interfaces.OperationContext for the operation.
+
+ Returns:
+ An IngestionManager appropriate for front-side use.
+ """
+ ingestion_manager = _IngestionManager(
+ lock, pool, _FrontConsumerCreator(subscription, operation_context),
+ packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager)
+ ingestion_manager.start(None)
+ return ingestion_manager
+
+
+def back_ingestion_manager(
+ lock, pool, servicer, termination_manager, transmission_manager,
+ operation_context, emission_consumer):
+ """Creates an IngestionManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-wide lock.
+ pool: A thread pool in which to execute customer code.
+ servicer: A base_interfaces.Servicer for servicing the operation.
+ termination_manager: The _interfaces.TerminationManager for the operation.
+ transmission_manager: The _interfaces.TransmissionManager for the
+ operation.
+ operation_context: A base_interfaces.OperationContext for the operation.
+ emission_consumer: The _interfaces.EmissionConsumer for the operation.
+
+ Returns:
+ An IngestionManager appropriate for back-side use.
+ """
+ ingestion_manager = _IngestionManager(
+ lock, pool, _BackConsumerCreator(
+ servicer, operation_context, emission_consumer),
+ packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager)
+ return ingestion_manager
diff --git a/src/python/_framework/base/packets/_interfaces.py b/src/python/_framework/base/packets/_interfaces.py
new file mode 100644
index 0000000000..5f6c0593d0
--- /dev/null
+++ b/src/python/_framework/base/packets/_interfaces.py
@@ -0,0 +1,269 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Package-internal interfaces."""
+
+import abc
+
+# base_interfaces and packets are referenced from specification in this module.
+from _framework.base import interfaces as base_interfaces # pylint: disable=unused-import
+from _framework.base.packets import packets # pylint: disable=unused-import
+from _framework.foundation import stream
+
+
+class TerminationManager(object):
+ """An object responsible for handling the termination of an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def is_active(self):
+ """Reports whether or not the operation is active.
+
+ Returns:
+ True if the operation is active or False if the operation has terminated.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_callback(self, callback):
+ """Registers a callback to be called on operation termination.
+
+ If the operation has already terminated, the callback will be called
+ immediately.
+
+ Args:
+ callback: A callable that will be passed one of base_interfaces.COMPLETED,
+ base_interfaces.CANCELLED, base_interfaces.EXPIRED,
+ base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
+ base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def emission_complete(self):
+ """Indicates that emissions from customer code have completed."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def transmission_complete(self):
+ """Indicates that transmissions to the remote end are complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def ingestion_complete(self):
+ """Indicates that customer code ingestion of received values is complete."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, kind):
+ """Indicates that the operation must abort for the indicated reason.
+
+ Args:
+ kind: A value of packets.Kind indicating operation abortion.
+ """
+ raise NotImplementedError()
+
+
+class TransmissionManager(object):
+ """A manager responsible for transmitting to the other end of an operation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def inmit(self, emission, complete):
+ """Accepts a value for transmission to the other end of the operation.
+
+ Args:
+ emission: A value of some significance to the customer to be transmitted
+ to the other end of the operation. May be None only if complete is True.
+ complete: A boolean that if True indicates that customer code has emitted
+ all values it intends to emit.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self, kind):
+ """Indicates that the operation has aborted for the indicated reason.
+
+ Args:
+ kind: A value of packets.Kind indicating operation abortion.
+ """
+ raise NotImplementedError()
+
+
+class EmissionManager(stream.Consumer):
+ """A manager of values emitted by customer code."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_ingestion_manager_and_expiration_manager(
+ self, ingestion_manager, expiration_manager):
+ """Sets two other objects with which this EmissionManager will cooperate.
+
+ Args:
+ ingestion_manager: The IngestionManager for the operation.
+ expiration_manager: The ExpirationManager for the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def consume(self, value):
+ """Accepts a value emitted by customer code.
+
+ This method should only be called by customer code.
+
+ Args:
+ value: Any value of significance to the customer.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """Indicates that no more values will be emitted by customer code.
+
+ This method should only be called by customer code.
+
+ Implementations of this method may be idempotent and forgive customer code
+ calling this method more than once.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def consume_and_terminate(self, value):
+ """Accepts the last value emitted by customer code.
+
+ This method should only be called by customer code.
+
+ Args:
+ value: Any value of significance to the customer.
+ """
+ raise NotImplementedError()
+
+
+class IngestionManager(stream.Consumer):
+ """A manager responsible for executing customer code."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this object will cooperate."""
+
+ @abc.abstractmethod
+ def start(self, requirement):
+ """Commences execution of customer code.
+
+ Args:
+ requirement: Some value unavailable at the time of this object's
+ construction that is required to begin executing customer code.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def consume(self, payload):
+ """Accepts a customer-significant value to be supplied to customer code.
+
+ Args:
+ payload: Some customer-significant value.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """Indicates the end of values to be supplied to customer code."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def consume_and_terminate(self, payload):
+ """Accepts the last value to be supplied to customer code.
+
+ Args:
+ payload: Some customer-significant value (and the last such value).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self):
+ """Indicates to this manager that the operation has aborted."""
+ raise NotImplementedError()
+
+
+class ExpirationManager(object):
+ """A manager responsible for aborting the operation if it runs out of time."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def change_timeout(self, timeout):
+ """Changes the timeout allotted for the operation.
+
+ Operation duration is always measure from the beginning of the operation;
+ calling this method changes the operation's allotted time to timeout total
+ seconds, not timeout seconds from the time of this method call.
+
+ Args:
+ timeout: A length of time in seconds to allow for the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def deadline(self):
+ """Returns the time until which the operation is allowed to run.
+
+ Returns:
+ The time (seconds since the epoch) at which the operation will expire.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def abort(self):
+ """Indicates to this manager that the operation has aborted."""
+ raise NotImplementedError()
+
+
+class ReceptionManager(object):
+ """A manager responsible for receiving packets from the other end."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def receive_packet(self, packet):
+ """Handle a packet from the other side of the operation.
+
+ Args:
+ packet: A packets.BackToFrontPacket or packets.FrontToBackPacket
+ appropriate to this end of the operation and this object.
+ """
+ raise NotImplementedError()
+
+
+class CancellationManager(object):
+ """A manager of operation cancellation."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the operation."""
+ raise NotImplementedError()
diff --git a/src/python/_framework/base/packets/_reception.py b/src/python/_framework/base/packets/_reception.py
new file mode 100644
index 0000000000..a2a3823d28
--- /dev/null
+++ b/src/python/_framework/base/packets/_reception.py
@@ -0,0 +1,394 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for packet reception."""
+
+import abc
+
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+
+
+class _Receiver(object):
+ """Common specification of different packet-handling behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def abort_if_abortive(self, packet):
+ """Aborts the operation if the packet is abortive.
+
+ Args:
+ packet: A just-arrived packet.
+
+ Returns:
+ A boolean indicating whether or not this Receiver aborted the operation
+ based on the packet.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def receive(self, packet):
+ """Handles a just-arrived packet.
+
+ Args:
+ packet: A just-arrived packet.
+
+ Returns:
+ A boolean indicating whether or not the packet was terminal (i.e. whether
+ or not non-abortive packets are legal after this one).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def reception_failure(self):
+ """Aborts the operation with an indication of reception failure."""
+ raise NotImplementedError()
+
+
+def _abort(
+ category, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Indicates abortion with the given category to the given managers."""
+ termination_manager.abort(category)
+ transmission_manager.abort(category)
+ ingestion_manager.abort()
+ expiration_manager.abort()
+
+
+def _abort_if_abortive(
+ packet, abortive, termination_manager, transmission_manager,
+ ingestion_manager, expiration_manager):
+ """Determines a packet's being abortive and if so aborts the operation.
+
+ Args:
+ packet: A just-arrived packet.
+ abortive: A callable that takes a packet and returns an operation category
+ indicating that the operation should be aborted or None indicating that
+ the operation should not be aborted.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ True if the operation was aborted; False otherwise.
+ """
+ abort_category = abortive(packet)
+ if abort_category is None:
+ return False
+ else:
+ _abort(
+ abort_category, termination_manager, transmission_manager,
+ ingestion_manager, expiration_manager)
+ return True
+
+
+def _reception_failure(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Aborts the operation with an indication of reception failure."""
+ _abort(
+ packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager,
+ ingestion_manager, expiration_manager)
+
+
+class _BackReceiver(_Receiver):
+ """Packet-handling specific to the back side of an operation."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ self._first_packet_seen = False
+ self._last_packet_seen = False
+
+ def _abortive(self, packet):
+ """Determines whether or not (and if so, how) a packet is abortive.
+
+ Args:
+ packet: A just-arrived packet.
+
+ Returns:
+ One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or
+ packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
+ and how, or None, indicating that the packet is not abortive.
+ """
+ if packet.kind is packets.Kind.CANCELLATION:
+ return packets.Kind.CANCELLATION
+ elif packet.kind is packets.Kind.EXPIRATION:
+ return packets.Kind.EXPIRATION
+ elif packet.kind is packets.Kind.SERVICED_FAILURE:
+ return packets.Kind.SERVICED_FAILURE
+ elif packet.kind is packets.Kind.RECEPTION_FAILURE:
+ return packets.Kind.SERVICED_FAILURE
+ elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and
+ self._first_packet_seen):
+ return packets.Kind.RECEPTION_FAILURE
+ elif self._last_packet_seen:
+ return packets.Kind.RECEPTION_FAILURE
+ else:
+ return None
+
+ def abort_if_abortive(self, packet):
+ """See _Receiver.abort_if_abortive for specification."""
+ return _abort_if_abortive(
+ packet, self._abortive, self._termination_manager,
+ self._transmission_manager, self._ingestion_manager,
+ self._expiration_manager)
+
+ def receive(self, packet):
+ """See _Receiver.receive for specification."""
+ if packet.timeout is not None:
+ self._expiration_manager.change_timeout(packet.timeout)
+
+ if packet.kind is packets.Kind.COMMENCEMENT:
+ self._first_packet_seen = True
+ self._ingestion_manager.start(packet.name)
+ if packet.payload is not None:
+ self._ingestion_manager.consume(packet.payload)
+ elif packet.kind is packets.Kind.CONTINUATION:
+ self._ingestion_manager.consume(packet.payload)
+ elif packet.kind is packets.Kind.COMPLETION:
+ self._last_packet_seen = True
+ if packet.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(packet.payload)
+ else:
+ self._first_packet_seen = True
+ self._last_packet_seen = True
+ self._ingestion_manager.start(packet.name)
+ if packet.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(packet.payload)
+
+ def reception_failure(self):
+ """See _Receiver.reception_failure for specification."""
+ _reception_failure(
+ self._termination_manager, self._transmission_manager,
+ self._ingestion_manager, self._expiration_manager)
+
+
+class _FrontReceiver(_Receiver):
+ """Packet-handling specific to the front side of an operation."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ self._last_packet_seen = False
+
+ def _abortive(self, packet):
+ """Determines whether or not (and if so, how) a packet is abortive.
+
+ Args:
+ packet: A just-arrived packet.
+
+ Returns:
+ One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or
+ packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
+ and how, or None, indicating that the packet is not abortive.
+ """
+ if packet.kind is packets.Kind.EXPIRATION:
+ return packets.Kind.EXPIRATION
+ elif packet.kind is packets.Kind.SERVICER_FAILURE:
+ return packets.Kind.SERVICER_FAILURE
+ elif packet.kind is packets.Kind.RECEPTION_FAILURE:
+ return packets.Kind.SERVICER_FAILURE
+ elif self._last_packet_seen:
+ return packets.Kind.RECEPTION_FAILURE
+ else:
+ return None
+
+ def abort_if_abortive(self, packet):
+ """See _Receiver.abort_if_abortive for specification."""
+ return _abort_if_abortive(
+ packet, self._abortive, self._termination_manager,
+ self._transmission_manager, self._ingestion_manager,
+ self._expiration_manager)
+
+ def receive(self, packet):
+ """See _Receiver.receive for specification."""
+ if packet.kind is packets.Kind.CONTINUATION:
+ self._ingestion_manager.consume(packet.payload)
+ elif packet.kind is packets.Kind.COMPLETION:
+ self._last_packet_seen = True
+ if packet.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(packet.payload)
+
+ def reception_failure(self):
+ """See _Receiver.reception_failure for specification."""
+ _reception_failure(
+ self._termination_manager, self._transmission_manager,
+ self._ingestion_manager, self._expiration_manager)
+
+
+class _ReceptionManager(_interfaces.ReceptionManager):
+ """A ReceptionManager based around a _Receiver passed to it."""
+
+ def __init__(self, lock, receiver):
+ """Constructor.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ receiver: A _Receiver responsible for handling received packets.
+ """
+ self._lock = lock
+ self._receiver = receiver
+
+ self._lowest_unseen_sequence_number = 0
+ self._out_of_sequence_packets = {}
+ self._completed_sequence_number = None
+ self._aborted = False
+
+ def _sequence_failure(self, packet):
+ """Determines a just-arrived packet's sequential legitimacy.
+
+ Args:
+ packet: A just-arrived packet.
+
+ Returns:
+ True if the packet is sequentially legitimate; False otherwise.
+ """
+ if packet.sequence_number < self._lowest_unseen_sequence_number:
+ return True
+ elif packet.sequence_number in self._out_of_sequence_packets:
+ return True
+ elif (self._completed_sequence_number is not None and
+ self._completed_sequence_number <= packet.sequence_number):
+ return True
+ else:
+ return False
+
+ def _process(self, packet):
+ """Process those packets ready to be processed.
+
+ Args:
+ packet: A just-arrived packet the sequence number of which matches this
+ _ReceptionManager's _lowest_unseen_sequence_number field.
+ """
+ while True:
+ completed = self._receiver.receive(packet)
+ if completed:
+ self._out_of_sequence_packets.clear()
+ self._completed_sequence_number = packet.sequence_number
+ self._lowest_unseen_sequence_number = packet.sequence_number + 1
+ return
+ else:
+ next_packet = self._out_of_sequence_packets.pop(
+ packet.sequence_number + 1, None)
+ if next_packet is None:
+ self._lowest_unseen_sequence_number = packet.sequence_number + 1
+ return
+ else:
+ packet = next_packet
+
+ def receive_packet(self, packet):
+ """See _interfaces.ReceptionManager.receive_packet for specification."""
+ with self._lock:
+ if self._aborted:
+ return
+ elif self._sequence_failure(packet):
+ self._receiver.reception_failure()
+ self._aborted = True
+ elif self._receiver.abort_if_abortive(packet):
+ self._aborted = True
+ elif packet.sequence_number == self._lowest_unseen_sequence_number:
+ self._process(packet)
+ else:
+ self._out_of_sequence_packets[packet.sequence_number] = packet
+
+
+def front_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Creates a _interfaces.ReceptionManager for front-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ A _interfaces.ReceptionManager appropriate for front-side use.
+ """
+ return _ReceptionManager(
+ lock, _FrontReceiver(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager))
+
+
+def back_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Creates a _interfaces.ReceptionManager for back-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ A _interfaces.ReceptionManager appropriate for back-side use.
+ """
+ return _ReceptionManager(
+ lock, _BackReceiver(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager))
diff --git a/src/python/_framework/base/packets/_termination.py b/src/python/_framework/base/packets/_termination.py
new file mode 100644
index 0000000000..d586c2167b
--- /dev/null
+++ b/src/python/_framework/base/packets/_termination.py
@@ -0,0 +1,201 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for operation termination."""
+
+from _framework.base import interfaces
+from _framework.base.packets import _constants
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+from _framework.foundation import callable_util
+
+_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
+
+# TODO(nathaniel): enum module.
+_EMISSION = 'emission'
+_TRANSMISSION = 'transmission'
+_INGESTION = 'ingestion'
+
+_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,)
+_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,)
+_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,)
+
+_KINDS_TO_OUTCOMES = {
+ packets.Kind.COMPLETION: interfaces.COMPLETED,
+ packets.Kind.CANCELLATION: interfaces.CANCELLED,
+ packets.Kind.EXPIRATION: interfaces.EXPIRED,
+ packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE,
+ packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE,
+ packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
+ packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
+ }
+
+
+class _TerminationManager(_interfaces.TerminationManager):
+ """An implementation of _interfaces.TerminationManager."""
+
+ def __init__(
+ self, work_pool, utility_pool, action, requirements, local_failure):
+ """Constructor.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION
+ identifying what must finish for the operation to be considered
+ completed.
+ local_failure: A packets.Kind specifying what constitutes local failure of
+ customer work.
+ """
+ self._work_pool = work_pool
+ self._utility_pool = utility_pool
+ self._action = action
+ self._local_failure = local_failure
+ self._has_locally_failed = False
+
+ self._outstanding_requirements = set(requirements)
+ self._kind = None
+ self._callbacks = []
+
+ def _terminate(self, kind):
+ """Terminates the operation.
+
+ Args:
+ kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION,
+ packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE,
+ packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
+ packets.Kind.SERVICED_FAILURE.
+ """
+ self._outstanding_requirements = None
+ callbacks = list(self._callbacks)
+ self._callbacks = None
+ self._kind = kind
+ outcome = _KINDS_TO_OUTCOMES[kind]
+
+ act = callable_util.with_exceptions_logged(
+ self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
+
+ if self._has_locally_failed:
+ self._utility_pool.submit(act, outcome)
+ else:
+ def call_callbacks_and_act(callbacks, outcome):
+ for callback in callbacks:
+ callback_outcome = callable_util.call_logging_exceptions(
+ callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
+ if callback_outcome.exception is not None:
+ outcome = _KINDS_TO_OUTCOMES[self._local_failure]
+ break
+ self._utility_pool.submit(act, outcome)
+
+ self._work_pool.submit(callable_util.with_exceptions_logged(
+ call_callbacks_and_act,
+ _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ callbacks, outcome)
+
+ def is_active(self):
+ """See _interfaces.TerminationManager.is_active for specification."""
+ return self._outstanding_requirements is not None
+
+ def add_callback(self, callback):
+ """See _interfaces.TerminationManager.add_callback for specification."""
+ if not self._has_locally_failed:
+ if self._outstanding_requirements is None:
+ self._work_pool.submit(
+ callable_util.with_exceptions_logged(
+ callback, _CALLBACK_EXCEPTION_LOG_MESSAGE),
+ _KINDS_TO_OUTCOMES[self._kind])
+ else:
+ self._callbacks.append(callback)
+
+ def emission_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_EMISSION)
+ if not self._outstanding_requirements:
+ self._terminate(packets.Kind.COMPLETION)
+
+ def transmission_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_TRANSMISSION)
+ if not self._outstanding_requirements:
+ self._terminate(packets.Kind.COMPLETION)
+
+ def ingestion_complete(self):
+ """See superclass method for specification."""
+ if self._outstanding_requirements is not None:
+ self._outstanding_requirements.discard(_INGESTION)
+ if not self._outstanding_requirements:
+ self._terminate(packets.Kind.COMPLETION)
+
+ def abort(self, kind):
+ """See _interfaces.TerminationManager.abort for specification."""
+ if kind == self._local_failure:
+ self._has_failed_locally = True
+ if self._outstanding_requirements is not None:
+ self._terminate(kind)
+
+
+def front_termination_manager(work_pool, utility_pool, action, subscription):
+ """Creates a TerminationManager appropriate for front-side use.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ subscription: One of interfaces.FULL, interfaces.termination_only, or
+ interfaces.NONE.
+
+ Returns:
+ A TerminationManager appropriate for front-side use.
+ """
+ return _TerminationManager(
+ work_pool, utility_pool, action,
+ _FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
+ _LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE)
+
+
+def back_termination_manager(work_pool, utility_pool, action, subscription):
+ """Creates a TerminationManager appropriate for back-side use.
+
+ Args:
+ work_pool: A thread pool in which customer work will be done.
+ utility_pool: A thread pool in which work utility work will be done.
+ action: An action to call on operation termination.
+ subscription: One of interfaces.FULL, interfaces.termination_only, or
+ interfaces.NONE.
+
+ Returns:
+ A TerminationManager appropriate for back-side use.
+ """
+ return _TerminationManager(
+ work_pool, utility_pool, action,
+ _BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
+ _LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE)
diff --git a/src/python/_framework/base/packets/_transmission.py b/src/python/_framework/base/packets/_transmission.py
new file mode 100644
index 0000000000..006128774d
--- /dev/null
+++ b/src/python/_framework/base/packets/_transmission.py
@@ -0,0 +1,393 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for packet transmission during an operation."""
+
+import abc
+
+from _framework.base import interfaces
+from _framework.base.packets import _constants
+from _framework.base.packets import _interfaces
+from _framework.base.packets import packets
+from _framework.foundation import callable_util
+
+_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
+
+_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = (
+ packets.Kind.SERVICER_FAILURE,
+ )
+_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = (
+ packets.Kind.CANCELLATION,
+ packets.Kind.SERVICED_FAILURE,
+ )
+
+
+class _Packetizer(object):
+ """Common specification of different packet-creating behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """Creates a packet indicating ordinary operation progress.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the packet.
+ payload: A customer payload object. May be None if sequence_number is
+ zero or complete is true.
+ complete: A boolean indicating whether or not the packet should describe
+ itself as (but for a later indication of operation abortion) the last
+ packet to be sent.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """Creates a packet indicating that the operation is aborted.
+
+ Args:
+ operation_id: The operation ID for the current operation.
+ sequence_number: A sequence number for the packet.
+ kind: One of the values of packets.Kind indicating operational abortion.
+
+ Returns:
+ An object of an appropriate type suitable for transmission to the other
+ side of the operation, or None if transmission is not appropriate for
+ the given kind.
+ """
+ raise NotImplementedError()
+
+
+class _FrontPacketizer(_Packetizer):
+ """Front-side packet-creating behavior."""
+
+ def __init__(self, name, subscription, trace_id, timeout):
+ """Constructor.
+
+ Args:
+ name: The name of the operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent
+ from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ """
+ self._name = name
+ self._subscription = subscription
+ self._trace_id = trace_id
+ self._timeout = timeout
+
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """See _Packetizer.packetize for specification."""
+ if sequence_number:
+ return packets.FrontToBackPacket(
+ operation_id, sequence_number,
+ packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
+ self._name, self._subscription, self._trace_id, payload,
+ self._timeout)
+ else:
+ return packets.FrontToBackPacket(
+ operation_id, 0,
+ packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
+ self._name, self._subscription, self._trace_id, payload,
+ self._timeout)
+
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """See _Packetizer.packetize_abortion for specification."""
+ if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS:
+ return None
+ else:
+ return packets.FrontToBackPacket(
+ operation_id, sequence_number, kind, None, None, None, None, None)
+
+
+class _BackPacketizer(_Packetizer):
+ """Back-side packet-creating behavior."""
+
+ def packetize(self, operation_id, sequence_number, payload, complete):
+ """See _Packetizer.packetize for specification."""
+ return packets.BackToFrontPacket(
+ operation_id, sequence_number,
+ packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
+ payload)
+
+ def packetize_abortion(self, operation_id, sequence_number, kind):
+ """See _Packetizer.packetize_abortion for specification."""
+ if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS:
+ return None
+ else:
+ return packets.BackToFrontPacket(
+ operation_id, sequence_number, kind, None)
+
+
+class TransmissionManager(_interfaces.TransmissionManager):
+ """A _interfaces.TransmissionManager on which other managers may be set."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """Sets two of the other managers with which this manager may interact.
+
+ Args:
+ ingestion_manager: The _interfaces.IngestionManager associated with the
+ current operation.
+ expiration_manager: The _interfaces.ExpirationManager associated with the
+ current operation.
+ """
+ raise NotImplementedError()
+
+
+class _EmptyTransmissionManager(TransmissionManager):
+ """A completely no-operative _interfaces.TransmissionManager."""
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overriden method for specification."""
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+
+ def abort(self, category):
+ """See _interfaces.TransmissionManager.abort for specification."""
+
+
+class _TransmittingTransmissionManager(TransmissionManager):
+ """A TransmissionManager implementation that sends packets."""
+
+ def __init__(
+ self, lock, pool, callback, operation_id, packetizer,
+ termination_manager):
+ """Constructor.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ packetizer: A _Packetizer for packet creation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ """
+ self._lock = lock
+ self._pool = pool
+ self._callback = callback
+ self._operation_id = operation_id
+ self._packetizer = packetizer
+ self._termination_manager = termination_manager
+ self._ingestion_manager = None
+ self._expiration_manager = None
+
+ self._emissions = []
+ self._emission_complete = False
+ self._kind = None
+ self._lowest_unused_sequence_number = 0
+ self._transmitting = False
+
+ def set_ingestion_and_expiration_managers(
+ self, ingestion_manager, expiration_manager):
+ """See overridden method for specification."""
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ def _lead_packet(self, emission, complete):
+ """Creates a packet suitable for leading off the transmission loop.
+
+ Args:
+ emission: A customer payload object to be sent to the other side of the
+ operation.
+ complete: Whether or not the sequence of customer payloads ends with
+ the passed object.
+
+ Returns:
+ A packet with which to lead off the transmission loop.
+ """
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return self._packetizer.packetize(
+ self._operation_id, sequence_number, emission, complete)
+
+ def _abortive_response_packet(self, kind):
+ """Creates a packet indicating operation abortion.
+
+ Args:
+ kind: One of the values of packets.Kind indicating operational abortion.
+
+ Returns:
+ A packet indicating operation abortion.
+ """
+ packet = self._packetizer.packetize_abortion(
+ self._operation_id, self._lowest_unused_sequence_number, kind)
+ if packet is None:
+ return None
+ else:
+ self._lowest_unused_sequence_number += 1
+ return packet
+
+ def _next_packet(self):
+ """Creates the next packet to be sent to the other side of the operation.
+
+ Returns:
+ A (completed, packet) tuple comprised of a boolean indicating whether or
+ not the sequence of packets has completed normally and a packet to send
+ to the other side if the sequence of packets hasn't completed. The tuple
+ will never have both a True first element and a non-None second element.
+ """
+ if self._emissions is None:
+ return False, None
+ elif self._kind is None:
+ if self._emissions:
+ payload = self._emissions.pop(0)
+ complete = self._emission_complete and not self._emissions
+ sequence_number = self._lowest_unused_sequence_number
+ self._lowest_unused_sequence_number += 1
+ return complete, self._packetizer.packetize(
+ self._operation_id, sequence_number, payload, complete)
+ else:
+ return self._emission_complete, None
+ else:
+ packet = self._abortive_response_packet(self._kind)
+ self._emissions = None
+ return False, None if packet is None else packet
+
+ def _transmit(self, packet):
+ """Commences the transmission loop sending packets.
+
+ Args:
+ packet: A packet to be sent to the other side of the operation.
+ """
+ def transmit(packet):
+ while True:
+ transmission_outcome = callable_util.call_logging_exceptions(
+ self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet)
+ if transmission_outcome.exception is None:
+ with self._lock:
+ complete, packet = self._next_packet()
+ if packet is None:
+ if complete:
+ self._termination_manager.transmission_complete()
+ self._transmitting = False
+ return
+ else:
+ with self._lock:
+ self._emissions = None
+ self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE)
+ self._ingestion_manager.abort()
+ self._expiration_manager.abort()
+ self._transmitting = False
+ return
+
+ self._pool.submit(callable_util.with_exceptions_logged(
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet)
+ self._transmitting = True
+
+ def inmit(self, emission, complete):
+ """See _interfaces.TransmissionManager.inmit for specification."""
+ if self._emissions is not None and self._kind is None:
+ self._emission_complete = complete
+ if self._transmitting:
+ self._emissions.append(emission)
+ else:
+ self._transmit(self._lead_packet(emission, complete))
+
+ def abort(self, kind):
+ """See _interfaces.TransmissionManager.abort for specification."""
+ if self._emissions is not None and self._kind is None:
+ self._kind = kind
+ if not self._transmitting:
+ packet = self._abortive_response_packet(kind)
+ self._emissions = None
+ if packet is not None:
+ self._transmit(packet)
+
+
+def front_transmission_manager(
+ lock, pool, callback, operation_id, name, subscription, trace_id, timeout,
+ termination_manager):
+ """Creates a TransmissionManager appropriate for front-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ name: The name of the operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent
+ from the back.
+ trace_id: A uuid.UUID identifying a set of related operations to which
+ this operation belongs.
+ timeout: A length of time in seconds to allow for the entire operation.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+
+ Returns:
+ A TransmissionManager appropriate for front-side use.
+ """
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _FrontPacketizer(
+ name, subscription, trace_id, timeout),
+ termination_manager)
+
+
+def back_transmission_manager(
+ lock, pool, callback, operation_id, termination_manager, subscription):
+ """Creates a TransmissionManager appropriate for back-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ pool: A thread pool in which the work of transmitting packets will be
+ performed.
+ callback: A callable that accepts packets and sends them to the other side
+ of the operation.
+ operation_id: The operation's ID.
+ termination_manager: The _interfaces.TerminationManager associated with
+ this operation.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent from
+ the back.
+
+ Returns:
+ A TransmissionManager appropriate for back-side use.
+ """
+ if subscription == interfaces.NONE:
+ return _EmptyTransmissionManager()
+ else:
+ return _TransmittingTransmissionManager(
+ lock, pool, callback, operation_id, _BackPacketizer(),
+ termination_manager)
diff --git a/src/python/_framework/base/packets/implementations.py b/src/python/_framework/base/packets/implementations.py
new file mode 100644
index 0000000000..2f07054d4d
--- /dev/null
+++ b/src/python/_framework/base/packets/implementations.py
@@ -0,0 +1,77 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the packet-exchange-based implementation the base layer."""
+
+# interfaces is referenced from specification in this module.
+from _framework.base.packets import _ends
+from _framework.base.packets import interfaces # pylint: disable=unused-import
+
+
+def front(work_pool, transmission_pool, utility_pool):
+ """Factory function for creating interfaces.Fronts.
+
+ Args:
+ work_pool: A thread pool to be used for doing work within the created Front
+ object.
+ transmission_pool: A thread pool to be used within the created Front object
+ for transmitting values to some Back object.
+ utility_pool: A thread pool to be used within the created Front object for
+ utility tasks.
+
+ Returns:
+ An interfaces.Front.
+ """
+ return _ends.Front(work_pool, transmission_pool, utility_pool)
+
+
+def back(
+ servicer, work_pool, transmission_pool, utility_pool, default_timeout,
+ maximum_timeout):
+ """Factory function for creating interfaces.Backs.
+
+ Args:
+ servicer: An interfaces.Servicer for servicing operations.
+ work_pool: A thread pool to be used for doing work within the created Back
+ object.
+ transmission_pool: A thread pool to be used within the created Back object
+ for transmitting values to some Front object.
+ utility_pool: A thread pool to be used within the created Back object for
+ utility tasks.
+ default_timeout: A length of time in seconds to be used as the default
+ time alloted for a single operation.
+ maximum_timeout: A length of time in seconds to be used as the maximum
+ time alloted for a single operation.
+
+ Returns:
+ An interfaces.Back.
+ """
+ return _ends.Back(
+ servicer, work_pool, transmission_pool, utility_pool, default_timeout,
+ maximum_timeout)
diff --git a/src/python/_framework/base/packets/implementations_test.py b/src/python/_framework/base/packets/implementations_test.py
new file mode 100644
index 0000000000..8bb5353176
--- /dev/null
+++ b/src/python/_framework/base/packets/implementations_test.py
@@ -0,0 +1,80 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests for _framework.base.packets.implementations."""
+
+import unittest
+
+from _framework.base import interfaces_test
+from _framework.base import util
+from _framework.base.packets import implementations
+from _framework.foundation import logging_pool
+
+POOL_MAX_WORKERS = 100
+DEFAULT_TIMEOUT = 30
+MAXIMUM_TIMEOUT = 60
+
+
+class ImplementationsTest(
+ interfaces_test.FrontAndBackTest, unittest.TestCase):
+
+ def setUp(self):
+ self.memory_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.front_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.front_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.front_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.back_work_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.back_transmission_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.test_pool = logging_pool.pool(POOL_MAX_WORKERS)
+ self.test_servicer = interfaces_test.TestServicer(self.test_pool)
+ self.front = implementations.front(
+ self.front_work_pool, self.front_transmission_pool,
+ self.front_utility_pool)
+ self.back = implementations.back(
+ self.test_servicer, self.back_work_pool, self.back_transmission_pool,
+ self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT)
+ self.front.join_rear_link(self.back)
+ self.back.join_fore_link(self.front)
+
+ def tearDown(self):
+ util.wait_for_idle(self.back)
+ util.wait_for_idle(self.front)
+ self.memory_transmission_pool.shutdown(wait=True)
+ self.front_work_pool.shutdown(wait=True)
+ self.front_transmission_pool.shutdown(wait=True)
+ self.front_utility_pool.shutdown(wait=True)
+ self.back_work_pool.shutdown(wait=True)
+ self.back_transmission_pool.shutdown(wait=True)
+ self.back_utility_pool.shutdown(wait=True)
+ self.test_pool.shutdown(wait=True)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/python/_framework/base/packets/in_memory.py b/src/python/_framework/base/packets/in_memory.py
new file mode 100644
index 0000000000..17daf3acf7
--- /dev/null
+++ b/src/python/_framework/base/packets/in_memory.py
@@ -0,0 +1,108 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into the packet-exchange-based implementation the base layer."""
+
+import threading
+
+from _framework.base.packets import _constants
+from _framework.base.packets import interfaces
+from _framework.foundation import callable_util
+
+
+class _Serializer(object):
+ """A utility for serializing values that may arrive concurrently."""
+
+ def __init__(self, pool):
+ self._lock = threading.Lock()
+ self._pool = pool
+ self._sink = None
+ self._spinning = False
+ self._values = []
+
+ def _spin(self, sink, value):
+ while True:
+ sink(value)
+ with self._lock:
+ if self._sink is None or not self._values:
+ self._spinning = False
+ return
+ else:
+ sink, value = self._sink, self._values.pop(0)
+
+ def set_sink(self, sink):
+ with self._lock:
+ self._sink = sink
+ if sink is not None and self._values and not self._spinning:
+ self._spinning = True
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ sink, self._values.pop(0))
+
+ def add_value(self, value):
+ with self._lock:
+ if self._sink and not self._spinning:
+ self._spinning = True
+ self._pool.submit(
+ callable_util.with_exceptions_logged(
+ self._spin, _constants.INTERNAL_ERROR_LOG_MESSAGE),
+ self._sink, value)
+ else:
+ self._values.append(value)
+
+
+class Link(interfaces.ForeLink, interfaces.RearLink):
+ """A trivial implementation of interfaces.ForeLink and interfaces.RearLink."""
+
+ def __init__(self, pool):
+ """Constructor.
+
+ Args:
+ pool: A thread pool to be used for serializing ticket exchange in each
+ direction.
+ """
+ self._front_to_back = _Serializer(pool)
+ self._back_to_front = _Serializer(pool)
+
+ def join_fore_link(self, fore_link):
+ """See interfaces.RearLink.join_fore_link for specification."""
+ self._back_to_front.set_sink(fore_link.accept_back_to_front_ticket)
+
+ def join_rear_link(self, rear_link):
+ """See interfaces.ForeLink.join_rear_link for specification."""
+ self._front_to_back.set_sink(rear_link.accept_front_to_back_ticket)
+
+ def accept_front_to_back_ticket(self, ticket):
+ """See interfaces.ForeLink.accept_front_to_back_ticket for specification."""
+ self._front_to_back.add_value(ticket)
+
+ def accept_back_to_front_ticket(self, ticket):
+ """See interfaces.RearLink.accept_back_to_front_ticket for specification."""
+ self._back_to_front.add_value(ticket)
diff --git a/src/python/_framework/base/packets/interfaces.py b/src/python/_framework/base/packets/interfaces.py
new file mode 100644
index 0000000000..99f9e87772
--- /dev/null
+++ b/src/python/_framework/base/packets/interfaces.py
@@ -0,0 +1,84 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Interfaces defined and used by the base layer of RPC Framework."""
+
+import abc
+
+# packets is referenced from specifications in this module.
+from _framework.base import interfaces
+from _framework.base.packets import packets # pylint: disable=unused-import
+
+
+class ForeLink(object):
+ """Accepts back-to-front tickets and emits front-to-back tickets."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def accept_back_to_front_ticket(self, ticket):
+ """Accept a packets.BackToFrontPacket.
+
+ Args:
+ ticket: Any packets.BackToFrontPacket.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def join_rear_link(self, rear_link):
+ """Mates this object with a peer with which it will exchange tickets."""
+ raise NotImplementedError()
+
+
+class RearLink(object):
+ """Accepts front-to-back tickets and emits back-to-front tickets."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def accept_front_to_back_ticket(self, ticket):
+ """Accepts a packets.FrontToBackPacket.
+
+ Args:
+ ticket: Any packets.FrontToBackPacket.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def join_fore_link(self, fore_link):
+ """Mates this object with a peer with which it will exchange tickets."""
+ raise NotImplementedError()
+
+
+class Front(ForeLink, interfaces.Front):
+ """Clientish objects that operate by sending and receiving tickets."""
+ __metaclass__ = abc.ABCMeta
+
+
+class Back(RearLink, interfaces.Back):
+ """Serverish objects that operate by sending and receiving tickets."""
+ __metaclass__ = abc.ABCMeta
diff --git a/src/python/_framework/base/packets/null.py b/src/python/_framework/base/packets/null.py
new file mode 100644
index 0000000000..9b40a00505
--- /dev/null
+++ b/src/python/_framework/base/packets/null.py
@@ -0,0 +1,56 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Null links that ignore tickets passed to them."""
+
+from _framework.base.packets import interfaces
+
+
+class _NullForeLink(interfaces.ForeLink):
+ """A do-nothing ForeLink."""
+
+ def accept_back_to_front_ticket(self, ticket):
+ pass
+
+ def join_rear_link(self, rear_link):
+ raise NotImplementedError()
+
+
+class _NullRearLink(interfaces.RearLink):
+ """A do-nothing RearLink."""
+
+ def accept_front_to_back_ticket(self, ticket):
+ pass
+
+ def join_fore_link(self, fore_link):
+ raise NotImplementedError()
+
+
+NULL_FORE_LINK = _NullForeLink()
+NULL_REAR_LINK = _NullRearLink()
diff --git a/src/python/_framework/base/packets/packets.py b/src/python/_framework/base/packets/packets.py
new file mode 100644
index 0000000000..1315ca650e
--- /dev/null
+++ b/src/python/_framework/base/packets/packets.py
@@ -0,0 +1,112 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Packets used between fronts and backs."""
+
+import collections
+import enum
+
+# interfaces is referenced from specifications in this module.
+from _framework.base import interfaces # pylint: disable=unused-import
+
+
+@enum.unique
+class Kind(enum.Enum):
+ """Identifies the overall kind of a ticket."""
+
+ COMMENCEMENT = 'commencement'
+ CONTINUATION = 'continuation'
+ COMPLETION = 'completion'
+ ENTIRE = 'entire'
+ CANCELLATION = 'cancellation'
+ EXPIRATION = 'expiration'
+ SERVICER_FAILURE = 'servicer failure'
+ SERVICED_FAILURE = 'serviced failure'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+
+
+class FrontToBackPacket(
+ collections.namedtuple(
+ 'FrontToBackPacket',
+ ['operation_id', 'sequence_number', 'kind', 'name', 'subscription',
+ 'trace_id', 'payload', 'timeout'])):
+ """A sum type for all values sent from a front to a back.
+
+ Attributes:
+ operation_id: A unique-with-respect-to-equality hashable object identifying
+ a particular operation.
+ sequence_number: A zero-indexed integer sequence number identifying the
+ packet's place among all the packets sent from front to back for this
+ particular operation. Must be zero if kind is Kind.COMMENCEMENT or
+ Kind.ENTIRE. Must be positive for any other kind.
+ kind: One of Kind.COMMENCEMENT, Kind.CONTINUATION, Kind.COMPLETION,
+ Kind.ENTIRE, Kind.CANCELLATION, Kind.EXPIRATION, Kind.SERVICED_FAILURE,
+ Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE.
+ name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
+ or Kind.ENTIRE. Must be None for any other kind.
+ subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
+ interfaces.NONE describing the interest the front has in packets sent from
+ the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE.
+ Must be None for any other kind.
+ trace_id: A uuid.UUID identifying a set of related operations to which this
+ operation belongs. May be None.
+ payload: A customer payload object. Must be present if kind is
+ Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None
+ for any other kind.
+ timeout: An optional length of time (measured from the beginning of the
+ operation) to allow for the entire operation. If None, a default value on
+ the back will be used. If present and excessively large, the back may
+ limit the operation to a smaller duration of its choice. May be present
+ for any ticket kind; setting a value on a later ticket allows fronts
+ to request time extensions (or even time reductions!) on in-progress
+ operations.
+ """
+
+
+class BackToFrontPacket(
+ collections.namedtuple(
+ 'BackToFrontPacket',
+ ['operation_id', 'sequence_number', 'kind', 'payload'])):
+ """A sum type for all values sent from a back to a front.
+
+ Attributes:
+ operation_id: A unique-with-respect-to-equality hashable object identifying
+ a particular operation.
+ sequence_number: A zero-indexed integer sequence number identifying the
+ packet's place among all the packets sent from back to front for this
+ particular operation.
+ kind: One of Kind.CONTINUATION, Kind.COMPLETION, Kind.EXPIRATION,
+ Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or
+ Kind.TRANSMISSION_FAILURE.
+ payload: A customer payload object. Must be present if kind is
+ Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None if
+ kind is Kind.EXPIRATION, Kind.SERVICER_FAILURE, Kind.RECEPTION_FAILURE, or
+ Kind.TRANSMISSION_FAILURE.
+ """