aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-03-16 17:12:43 -0700
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-03-16 17:12:43 -0700
commitafdc24ae2b38318de7721fa8ab4684e460104ed7 (patch)
tree5f5df7b42f66540f595573411f736a64eda04e76 /src
parentea34b1f078ed8768b7adf96a01e93679ac6f24b7 (diff)
parent8d81365a3c03184683782cdd2dd8a9c934c790c7 (diff)
Merge pull request #1055 from nathanielmanistaatgoogle/python-refactoring
Python refactoring
Diffstat (limited to 'src')
-rw-r--r--src/python/src/grpc/framework/base/packets/_cancellation.py5
-rw-r--r--src/python/src/grpc/framework/base/packets/_context.py8
-rw-r--r--src/python/src/grpc/framework/base/packets/_emission.py22
-rw-r--r--src/python/src/grpc/framework/base/packets/_ends.py16
-rw-r--r--src/python/src/grpc/framework/base/packets/_expiration.py6
-rw-r--r--src/python/src/grpc/framework/base/packets/_ingestion.py40
-rw-r--r--src/python/src/grpc/framework/base/packets/_interfaces.py8
-rw-r--r--src/python/src/grpc/framework/base/packets/_reception.py55
-rw-r--r--src/python/src/grpc/framework/base/packets/_termination.py48
-rw-r--r--src/python/src/grpc/framework/base/packets/_transmission.py63
10 files changed, 131 insertions, 140 deletions
diff --git a/src/python/src/grpc/framework/base/packets/_cancellation.py b/src/python/src/grpc/framework/base/packets/_cancellation.py
index 2373c78842..4a0ced1440 100644
--- a/src/python/src/grpc/framework/base/packets/_cancellation.py
+++ b/src/python/src/grpc/framework/base/packets/_cancellation.py
@@ -29,6 +29,7 @@
"""State and behavior for operation cancellation."""
+from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets
@@ -58,7 +59,7 @@ class CancellationManager(_interfaces.CancellationManager):
def cancel(self):
"""See _interfaces.CancellationManager.cancel for specification."""
with self._lock:
- self._termination_manager.abort(packets.Kind.CANCELLATION)
- self._transmission_manager.abort(packets.Kind.CANCELLATION)
+ self._termination_manager.abort(base_interfaces.Outcome.CANCELLED)
+ self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED)
self._ingestion_manager.abort()
self._expiration_manager.abort()
diff --git a/src/python/src/grpc/framework/base/packets/_context.py b/src/python/src/grpc/framework/base/packets/_context.py
index e09d4a60c9..45241c639e 100644
--- a/src/python/src/grpc/framework/base/packets/_context.py
+++ b/src/python/src/grpc/framework/base/packets/_context.py
@@ -31,10 +31,9 @@
import time
-# _interfaces and packets are referenced from specification in this module.
+# _interfaces is referenced from specification in this module.
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import
-from grpc.framework.base.packets import packets # pylint: disable=unused-import
class OperationContext(base_interfaces.OperationContext):
@@ -48,8 +47,9 @@ class OperationContext(base_interfaces.OperationContext):
Args:
lock: The operation-wide lock.
operation_id: An object identifying the operation.
- local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or
- packets.Kind.SERVICER_FAILURE describes local failure of customer code.
+ local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE
+ or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of
+ customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
diff --git a/src/python/src/grpc/framework/base/packets/_emission.py b/src/python/src/grpc/framework/base/packets/_emission.py
index 9446b8665d..cfc9e40a24 100644
--- a/src/python/src/grpc/framework/base/packets/_emission.py
+++ b/src/python/src/grpc/framework/base/packets/_emission.py
@@ -29,29 +29,29 @@
"""State and behavior for handling emitted values."""
-# packets is referenced from specifications in this module.
+from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets # pylint: disable=unused-import
class _EmissionManager(_interfaces.EmissionManager):
"""An implementation of _interfaces.EmissionManager."""
def __init__(
- self, lock, failure_kind, termination_manager, transmission_manager):
+ self, lock, failure_outcome, termination_manager, transmission_manager):
"""Constructor.
Args:
lock: The operation-wide lock.
- failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
- packets.Kind.SERVICER_FAILURE describes this object's methods being
- called inappropriately by customer code.
+ failure_outcome: Whichever one of
+ base_interfaces.Outcome.SERVICED_FAILURE or
+ base_interfaces.Outcome.SERVICER_FAILURE describes this object's
+ methods being called inappropriately by customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
"""
self._lock = lock
- self._failure_kind = failure_kind
+ self._failure_outcome = failure_outcome
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._ingestion_manager = None
@@ -65,8 +65,8 @@ class _EmissionManager(_interfaces.EmissionManager):
self._expiration_manager = expiration_manager
def _abort(self):
- self._termination_manager.abort(self._failure_kind)
- self._transmission_manager.abort(self._failure_kind)
+ self._termination_manager.abort(self._failure_outcome)
+ self._transmission_manager.abort(self._failure_outcome)
self._ingestion_manager.abort()
self._expiration_manager.abort()
@@ -106,7 +106,7 @@ def front_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for front-side use.
"""
return _EmissionManager(
- lock, packets.Kind.SERVICED_FAILURE, termination_manager,
+ lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager)
@@ -122,5 +122,5 @@ def back_emission_manager(lock, termination_manager, transmission_manager):
An _interfaces.EmissionManager appropriate for back-side use.
"""
return _EmissionManager(
- lock, packets.Kind.SERVICER_FAILURE, termination_manager,
+ lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager)
diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/packets/_ends.py
index ac369c4fbd..614d1f666e 100644
--- a/src/python/src/grpc/framework/base/packets/_ends.py
+++ b/src/python/src/grpc/framework/base/packets/_ends.py
@@ -50,16 +50,6 @@ from grpc.framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
-_OPERATION_OUTCOMES = (
- base_interfaces.Outcome.COMPLETED,
- base_interfaces.Outcome.CANCELLED,
- base_interfaces.Outcome.EXPIRED,
- base_interfaces.Outcome.RECEPTION_FAILURE,
- base_interfaces.Outcome.TRANSMISSION_FAILURE,
- base_interfaces.Outcome.SERVICER_FAILURE,
- base_interfaces.Outcome.SERVICED_FAILURE,
- )
-
class _EasyOperation(base_interfaces.Operation):
"""A trivial implementation of base_interfaces.Operation."""
@@ -98,7 +88,7 @@ class _Endlette(object):
# indicates an in-progress fire-and-forget operation for which the customer
# has chosen to ignore results.
self._operations = {}
- self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES}
+ self._stats = {outcome: 0 for outcome in base_interfaces.Outcome}
self._idle_actions = []
def terminal_action(self, operation_id):
@@ -198,7 +188,7 @@ def _front_operate(
lock, transmission_pool, callback, operation_id, name,
subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
- lock, operation_id, packets.Kind.SERVICED_FAILURE,
+ lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.front_emission_manager(
lock, termination_manager, transmission_manager)
@@ -327,7 +317,7 @@ def _back_operate(
lock, transmission_pool, callback, ticket.operation_id,
termination_manager, ticket.subscription)
operation_context = _context.OperationContext(
- lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE,
+ lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.back_emission_manager(
lock, termination_manager, transmission_manager)
diff --git a/src/python/src/grpc/framework/base/packets/_expiration.py b/src/python/src/grpc/framework/base/packets/_expiration.py
index f58db28aa2..a9ecaeaa63 100644
--- a/src/python/src/grpc/framework/base/packets/_expiration.py
+++ b/src/python/src/grpc/framework/base/packets/_expiration.py
@@ -31,8 +31,8 @@
import time
+from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
from grpc.framework.foundation import later
@@ -73,8 +73,8 @@ class _ExpirationManager(_interfaces.ExpirationManager):
with self._lock:
if self._future is not None and index == self._index:
self._future = None
- self._termination_manager.abort(packets.Kind.EXPIRATION)
- self._transmission_manager.abort(packets.Kind.EXPIRATION)
+ self._termination_manager.abort(base_interfaces.Outcome.EXPIRED)
+ self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED)
self._ingestion_manager.abort()
def start(self):
diff --git a/src/python/src/grpc/framework/base/packets/_ingestion.py b/src/python/src/grpc/framework/base/packets/_ingestion.py
index a750195ccb..c5c08fd98e 100644
--- a/src/python/src/grpc/framework/base/packets/_ingestion.py
+++ b/src/python/src/grpc/framework/base/packets/_ingestion.py
@@ -206,7 +206,7 @@ class _IngestionManager(_interfaces.IngestionManager):
"""An implementation of _interfaces.IngestionManager."""
def __init__(
- self, lock, pool, consumer_creator, failure_kind, termination_manager,
+ self, lock, pool, consumer_creator, failure_outcome, termination_manager,
transmission_manager):
"""Constructor.
@@ -216,8 +216,10 @@ class _IngestionManager(_interfaces.IngestionManager):
consumer_creator: A _ConsumerCreator wrapping the portion of customer code
that when called returns the stream.Consumer with which the customer
code will ingest payload values.
- failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or
- packets.Kind.SERVICER_FAILURE describes local failure of customer code.
+ failure_outcome: Whichever one of
+ interfaces.Outcome.SERVICED_FAILURE or
+ interfaces.Outcome.SERVICER_FAILURE describes local failure of
+ customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
@@ -225,7 +227,7 @@ class _IngestionManager(_interfaces.IngestionManager):
self._lock = lock
self._pool = pool
self._consumer_creator = consumer_creator
- self._failure_kind = failure_kind
+ self._failure_outcome = failure_outcome
self._termination_manager = termination_manager
self._transmission_manager = transmission_manager
self._expiration_manager = None
@@ -299,12 +301,12 @@ class _IngestionManager(_interfaces.IngestionManager):
else:
with self._lock:
if self._pending_ingestion is not None:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
self._processing = False
return
else:
with self._lock:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
self._processing = False
return
@@ -316,16 +318,16 @@ class _IngestionManager(_interfaces.IngestionManager):
_CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement)
if consumer_creation_outcome.return_value is None:
with self._lock:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
self._processing = False
elif consumer_creation_outcome.return_value.remote_error:
with self._lock:
- self._abort_and_notify(packets.Kind.RECEPTION_FAILURE)
+ self._abort_and_notify(interfaces.Outcome.RECEPTION_FAILURE)
self._processing = False
elif consumer_creation_outcome.return_value.abandoned:
with self._lock:
if self._pending_ingestion is not None:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
self._processing = False
else:
wrapped_ingestion_consumer = _WrappedConsumer(
@@ -346,7 +348,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def consume(self, payload):
if self._ingestion_complete:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
elif self._pending_ingestion is not None:
if self._processing:
self._pending_ingestion.append(payload)
@@ -359,7 +361,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def terminate(self):
if self._ingestion_complete:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None and not self._processing:
@@ -371,7 +373,7 @@ class _IngestionManager(_interfaces.IngestionManager):
def consume_and_terminate(self, payload):
if self._ingestion_complete:
- self._abort_and_notify(self._failure_kind)
+ self._abort_and_notify(self._failure_outcome)
else:
self._ingestion_complete = True
if self._pending_ingestion is not None:
@@ -397,19 +399,20 @@ def front_ingestion_manager(
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
- subscription: A base_interfaces.ServicedSubscription indicating the
+ subscription: A interfaces.ServicedSubscription indicating the
customer's interest in the results of the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
- operation_context: A base_interfaces.OperationContext for the operation.
+ operation_context: A interfaces.OperationContext for the operation.
Returns:
An IngestionManager appropriate for front-side use.
"""
ingestion_manager = _IngestionManager(
lock, pool, _FrontConsumerCreator(subscription, operation_context),
- packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager)
+ interfaces.Outcome.SERVICED_FAILURE, termination_manager,
+ transmission_manager)
ingestion_manager.start(None)
return ingestion_manager
@@ -422,11 +425,11 @@ def back_ingestion_manager(
Args:
lock: The operation-wide lock.
pool: A thread pool in which to execute customer code.
- servicer: A base_interfaces.Servicer for servicing the operation.
+ servicer: A interfaces.Servicer for servicing the operation.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
- operation_context: A base_interfaces.OperationContext for the operation.
+ operation_context: A interfaces.OperationContext for the operation.
emission_consumer: The _interfaces.EmissionConsumer for the operation.
Returns:
@@ -435,5 +438,6 @@ def back_ingestion_manager(
ingestion_manager = _IngestionManager(
lock, pool, _BackConsumerCreator(
servicer, operation_context, emission_consumer),
- packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager)
+ interfaces.Outcome.SERVICER_FAILURE, termination_manager,
+ transmission_manager)
return ingestion_manager
diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/packets/_interfaces.py
index 64dc33e8d5..64184bdf7c 100644
--- a/src/python/src/grpc/framework/base/packets/_interfaces.py
+++ b/src/python/src/grpc/framework/base/packets/_interfaces.py
@@ -83,11 +83,11 @@ class TerminationManager(object):
raise NotImplementedError()
@abc.abstractmethod
- def abort(self, kind):
+ def abort(self, outcome):
"""Indicates that the operation must abort for the indicated reason.
Args:
- kind: A value of packets.Kind indicating operation abortion.
+ outcome: A base_interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()
@@ -109,11 +109,11 @@ class TransmissionManager(object):
raise NotImplementedError()
@abc.abstractmethod
- def abort(self, kind):
+ def abort(self, outcome):
"""Indicates that the operation has aborted for the indicated reason.
Args:
- kind: A value of packets.Kind indicating operation abortion.
+ outcome: A base_interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()
diff --git a/src/python/src/grpc/framework/base/packets/_reception.py b/src/python/src/grpc/framework/base/packets/_reception.py
index 6e2c9c0a4e..becbef828d 100644
--- a/src/python/src/grpc/framework/base/packets/_reception.py
+++ b/src/python/src/grpc/framework/base/packets/_reception.py
@@ -31,6 +31,7 @@
import abc
+from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base.packets import _interfaces
from grpc.framework.base.packets import packets
@@ -72,11 +73,11 @@ class _Receiver(object):
def _abort(
- category, termination_manager, transmission_manager, ingestion_manager,
+ outcome, termination_manager, transmission_manager, ingestion_manager,
expiration_manager):
- """Indicates abortion with the given category to the given managers."""
- termination_manager.abort(category)
- transmission_manager.abort(category)
+ """Indicates abortion with the given outcome to the given managers."""
+ termination_manager.abort(outcome)
+ transmission_manager.abort(outcome)
ingestion_manager.abort()
expiration_manager.abort()
@@ -88,9 +89,9 @@ def _abort_if_abortive(
Args:
packet: A just-arrived packet.
- abortive: A callable that takes a packet and returns an operation category
- indicating that the operation should be aborted or None indicating that
- the operation should not be aborted.
+ abortive: A callable that takes a packet and returns a
+ base_interfaces.Outcome indicating that the operation should be aborted
+ or None indicating that the operation should not be aborted.
termination_manager: The operation's _interfaces.TerminationManager.
transmission_manager: The operation's _interfaces.TransmissionManager.
ingestion_manager: The operation's _interfaces.IngestionManager.
@@ -99,12 +100,12 @@ def _abort_if_abortive(
Returns:
True if the operation was aborted; False otherwise.
"""
- abort_category = abortive(packet)
- if abort_category is None:
+ abortion_outcome = abortive(packet)
+ if abortion_outcome is None:
return False
else:
_abort(
- abort_category, termination_manager, transmission_manager,
+ abortion_outcome, termination_manager, transmission_manager,
ingestion_manager, expiration_manager)
return True
@@ -114,8 +115,8 @@ def _reception_failure(
expiration_manager):
"""Aborts the operation with an indication of reception failure."""
_abort(
- packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager,
- ingestion_manager, expiration_manager)
+ base_interfaces.Outcome.RECEPTION_FAILURE, termination_manager,
+ transmission_manager, ingestion_manager, expiration_manager)
class _BackReceiver(_Receiver):
@@ -147,23 +148,22 @@ class _BackReceiver(_Receiver):
packet: A just-arrived packet.
Returns:
- One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or
- packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
- and how, or None, indicating that the packet is not abortive.
+ A base_interfaces.Outcome value describing operation abortion if the
+ packet is abortive or None if the packet is not abortive.
"""
if packet.kind is packets.Kind.CANCELLATION:
- return packets.Kind.CANCELLATION
+ return base_interfaces.Outcome.CANCELLED
elif packet.kind is packets.Kind.EXPIRATION:
- return packets.Kind.EXPIRATION
+ return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICED_FAILURE:
- return packets.Kind.SERVICED_FAILURE
+ return base_interfaces.Outcome.SERVICED_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
- return packets.Kind.SERVICED_FAILURE
+ return base_interfaces.Outcome.SERVICED_FAILURE
elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and
self._first_packet_seen):
- return packets.Kind.RECEPTION_FAILURE
+ return base_interfaces.Outcome.RECEPTION_FAILURE
elif self._last_packet_seen:
- return packets.Kind.RECEPTION_FAILURE
+ return base_interfaces.Outcome.RECEPTION_FAILURE
else:
return None
@@ -236,18 +236,17 @@ class _FrontReceiver(_Receiver):
packet: A just-arrived packet.
Returns:
- One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or
- packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive
- and how, or None, indicating that the packet is not abortive.
+ A base_interfaces.Outcome value describing operation abortion if the
+ packet is abortive or None if the packet is not abortive.
"""
if packet.kind is packets.Kind.EXPIRATION:
- return packets.Kind.EXPIRATION
+ return base_interfaces.Outcome.EXPIRED
elif packet.kind is packets.Kind.SERVICER_FAILURE:
- return packets.Kind.SERVICER_FAILURE
+ return base_interfaces.Outcome.SERVICER_FAILURE
elif packet.kind is packets.Kind.RECEPTION_FAILURE:
- return packets.Kind.SERVICER_FAILURE
+ return base_interfaces.Outcome.SERVICER_FAILURE
elif self._last_packet_seen:
- return packets.Kind.RECEPTION_FAILURE
+ return base_interfaces.Outcome.RECEPTION_FAILURE
else:
return None
diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/packets/_termination.py
index 575eee65a8..6afba88fc4 100644
--- a/src/python/src/grpc/framework/base/packets/_termination.py
+++ b/src/python/src/grpc/framework/base/packets/_termination.py
@@ -34,21 +34,10 @@ import enum
from grpc.framework.base import interfaces
from grpc.framework.base.packets import _constants
from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
from grpc.framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
-_KINDS_TO_OUTCOMES = {
- packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED,
- packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED,
- packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED,
- packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE,
- packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE,
- packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE,
- packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE,
- }
-
@enum.unique
class _Requirement(enum.Enum):
@@ -78,8 +67,8 @@ class _TerminationManager(_interfaces.TerminationManager):
action: An action to call on operation termination.
requirements: A combination of _Requirement values identifying what
must finish for the operation to be considered completed.
- local_failure: A packets.Kind specifying what constitutes local failure of
- customer work.
+ local_failure: An interfaces.Outcome specifying what constitutes local
+ failure of customer work.
"""
self._work_pool = work_pool
self._utility_pool = utility_pool
@@ -89,27 +78,23 @@ class _TerminationManager(_interfaces.TerminationManager):
self._expiration_manager = None
self._outstanding_requirements = set(requirements)
- self._kind = None
+ self._outcome = None
self._callbacks = []
def set_expiration_manager(self, expiration_manager):
self._expiration_manager = expiration_manager
- def _terminate(self, kind):
+ def _terminate(self, outcome):
"""Terminates the operation.
Args:
- kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION,
- packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE,
- packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
- packets.Kind.SERVICED_FAILURE.
+ outcome: An interfaces.Outcome describing the outcome of the operation.
"""
self._expiration_manager.abort()
self._outstanding_requirements = None
callbacks = list(self._callbacks)
self._callbacks = None
- self._kind = kind
- outcome = _KINDS_TO_OUTCOMES[kind]
+ self._outcome = outcome
act = callable_util.with_exceptions_logged(
self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
@@ -122,7 +107,7 @@ class _TerminationManager(_interfaces.TerminationManager):
callback_outcome = callable_util.call_logging_exceptions(
callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome)
if callback_outcome.exception is not None:
- outcome = _KINDS_TO_OUTCOMES[self._local_failure]
+ outcome = self._local_failure
break
self._utility_pool.submit(act, outcome)
@@ -141,8 +126,7 @@ class _TerminationManager(_interfaces.TerminationManager):
if self._outstanding_requirements is None:
self._work_pool.submit(
callable_util.with_exceptions_logged(
- callback, _CALLBACK_EXCEPTION_LOG_MESSAGE),
- _KINDS_TO_OUTCOMES[self._kind])
+ callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), self._outcome)
else:
self._callbacks.append(callback)
@@ -151,28 +135,28 @@ class _TerminationManager(_interfaces.TerminationManager):
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.EMISSION)
if not self._outstanding_requirements:
- self._terminate(packets.Kind.COMPLETION)
+ self._terminate(interfaces.Outcome.COMPLETED)
def transmission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
if not self._outstanding_requirements:
- self._terminate(packets.Kind.COMPLETION)
+ self._terminate(interfaces.Outcome.COMPLETED)
def ingestion_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_Requirement.INGESTION)
if not self._outstanding_requirements:
- self._terminate(packets.Kind.COMPLETION)
+ self._terminate(interfaces.Outcome.COMPLETED)
- def abort(self, kind):
+ def abort(self, outcome):
"""See _interfaces.TerminationManager.abort for specification."""
- if kind == self._local_failure:
+ if outcome is self._local_failure:
self._has_failed_locally = True
if self._outstanding_requirements is not None:
- self._terminate(kind)
+ self._terminate(outcome)
def front_termination_manager(
@@ -195,7 +179,7 @@ def front_termination_manager(
return _TerminationManager(
work_pool, utility_pool, action, requirements,
- packets.Kind.SERVICED_FAILURE)
+ interfaces.Outcome.SERVICED_FAILURE)
def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
@@ -217,4 +201,4 @@ def back_termination_manager(work_pool, utility_pool, action, subscription_kind)
return _TerminationManager(
work_pool, utility_pool, action, requirements,
- packets.Kind.SERVICER_FAILURE)
+ interfaces.Outcome.SERVICER_FAILURE)
diff --git a/src/python/src/grpc/framework/base/packets/_transmission.py b/src/python/src/grpc/framework/base/packets/_transmission.py
index ac7f4509db..002519b9a1 100644
--- a/src/python/src/grpc/framework/base/packets/_transmission.py
+++ b/src/python/src/grpc/framework/base/packets/_transmission.py
@@ -39,14 +39,24 @@ from grpc.framework.foundation import callable_util
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
-_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = (
- packets.Kind.SERVICER_FAILURE,
+_FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = (
+ interfaces.Outcome.SERVICER_FAILURE,
)
-_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = (
- packets.Kind.CANCELLATION,
- packets.Kind.SERVICED_FAILURE,
+_BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = (
+ interfaces.Outcome.CANCELLED,
+ interfaces.Outcome.SERVICED_FAILURE,
)
+_ABORTION_OUTCOME_TO_PACKET_KIND = {
+ interfaces.Outcome.CANCELLED: packets.Kind.CANCELLATION,
+ interfaces.Outcome.EXPIRED: packets.Kind.EXPIRATION,
+ interfaces.Outcome.RECEPTION_FAILURE: packets.Kind.RECEPTION_FAILURE,
+ interfaces.Outcome.TRANSMISSION_FAILURE: packets.Kind.TRANSMISSION_FAILURE,
+ interfaces.Outcome.SERVICED_FAILURE: packets.Kind.SERVICED_FAILURE,
+ interfaces.Outcome.SERVICER_FAILURE: packets.Kind.SERVICER_FAILURE,
+}
+
+
class _Packetizer(object):
"""Common specification of different packet-creating behavior."""
@@ -72,18 +82,18 @@ class _Packetizer(object):
raise NotImplementedError()
@abc.abstractmethod
- def packetize_abortion(self, operation_id, sequence_number, kind):
+ def packetize_abortion(self, operation_id, sequence_number, outcome):
"""Creates a packet indicating that the operation is aborted.
Args:
operation_id: The operation ID for the current operation.
sequence_number: A sequence number for the packet.
- kind: One of the values of packets.Kind indicating operational abortion.
+ outcome: An interfaces.Outcome value describing the operation abortion.
Returns:
An object of an appropriate type suitable for transmission to the other
side of the operation, or None if transmission is not appropriate for
- the given kind.
+ the given outcome.
"""
raise NotImplementedError()
@@ -122,11 +132,12 @@ class _FrontPacketizer(_Packetizer):
self._name, self._subscription_kind, self._trace_id, payload,
self._timeout)
- def packetize_abortion(self, operation_id, sequence_number, kind):
+ def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification."""
- if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS:
+ if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
return None
else:
+ kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome]
return packets.FrontToBackPacket(
operation_id, sequence_number, kind, None, None, None, None, None)
@@ -141,11 +152,12 @@ class _BackPacketizer(_Packetizer):
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
payload)
- def packetize_abortion(self, operation_id, sequence_number, kind):
+ def packetize_abortion(self, operation_id, sequence_number, outcome):
"""See _Packetizer.packetize_abortion for specification."""
- if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS:
+ if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
return None
else:
+ kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome]
return packets.BackToFrontPacket(
operation_id, sequence_number, kind, None)
@@ -178,7 +190,7 @@ class _EmptyTransmissionManager(TransmissionManager):
def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification."""
- def abort(self, category):
+ def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
@@ -212,7 +224,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
self._emissions = []
self._emission_complete = False
- self._kind = None
+ self._outcome = None
self._lowest_unused_sequence_number = 0
self._transmitting = False
@@ -239,17 +251,17 @@ class _TransmittingTransmissionManager(TransmissionManager):
return self._packetizer.packetize(
self._operation_id, sequence_number, emission, complete)
- def _abortive_response_packet(self, kind):
+ def _abortive_response_packet(self, outcome):
"""Creates a packet indicating operation abortion.
Args:
- kind: One of the values of packets.Kind indicating operational abortion.
+ outcome: An interfaces.Outcome value describing operation abortion.
Returns:
A packet indicating operation abortion.
"""
packet = self._packetizer.packetize_abortion(
- self._operation_id, self._lowest_unused_sequence_number, kind)
+ self._operation_id, self._lowest_unused_sequence_number, outcome)
if packet is None:
return None
else:
@@ -267,7 +279,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
"""
if self._emissions is None:
return False, None
- elif self._kind is None:
+ elif self._outcome is None:
if self._emissions:
payload = self._emissions.pop(0)
complete = self._emission_complete and not self._emissions
@@ -278,7 +290,7 @@ class _TransmittingTransmissionManager(TransmissionManager):
else:
return self._emission_complete, None
else:
- packet = self._abortive_response_packet(self._kind)
+ packet = self._abortive_response_packet(self._outcome)
self._emissions = None
return False, None if packet is None else packet
@@ -303,7 +315,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
else:
with self._lock:
self._emissions = None
- self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE)
+ self._termination_manager.abort(
+ interfaces.Outcome.TRANSMISSION_FAILURE)
self._ingestion_manager.abort()
self._expiration_manager.abort()
self._transmitting = False
@@ -315,19 +328,19 @@ class _TransmittingTransmissionManager(TransmissionManager):
def inmit(self, emission, complete):
"""See _interfaces.TransmissionManager.inmit for specification."""
- if self._emissions is not None and self._kind is None:
+ if self._emissions is not None and self._outcome is None:
self._emission_complete = complete
if self._transmitting:
self._emissions.append(emission)
else:
self._transmit(self._lead_packet(emission, complete))
- def abort(self, kind):
+ def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
- if self._emissions is not None and self._kind is None:
- self._kind = kind
+ if self._emissions is not None and self._outcome is None:
+ self._outcome = outcome
if not self._transmitting:
- packet = self._abortive_response_packet(kind)
+ packet = self._abortive_response_packet(outcome)
self._emissions = None
if packet is not None:
self._transmit(packet)