diff options
author | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-03-16 17:12:43 -0700 |
---|---|---|
committer | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-03-16 17:12:43 -0700 |
commit | afdc24ae2b38318de7721fa8ab4684e460104ed7 (patch) | |
tree | 5f5df7b42f66540f595573411f736a64eda04e76 /src | |
parent | ea34b1f078ed8768b7adf96a01e93679ac6f24b7 (diff) | |
parent | 8d81365a3c03184683782cdd2dd8a9c934c790c7 (diff) |
Merge pull request #1055 from nathanielmanistaatgoogle/python-refactoring
Python refactoring
Diffstat (limited to 'src')
10 files changed, 131 insertions, 140 deletions
diff --git a/src/python/src/grpc/framework/base/packets/_cancellation.py b/src/python/src/grpc/framework/base/packets/_cancellation.py index 2373c78842..4a0ced1440 100644 --- a/src/python/src/grpc/framework/base/packets/_cancellation.py +++ b/src/python/src/grpc/framework/base/packets/_cancellation.py @@ -29,6 +29,7 @@ """State and behavior for operation cancellation.""" +from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import packets @@ -58,7 +59,7 @@ class CancellationManager(_interfaces.CancellationManager): def cancel(self): """See _interfaces.CancellationManager.cancel for specification.""" with self._lock: - self._termination_manager.abort(packets.Kind.CANCELLATION) - self._transmission_manager.abort(packets.Kind.CANCELLATION) + self._termination_manager.abort(base_interfaces.Outcome.CANCELLED) + self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED) self._ingestion_manager.abort() self._expiration_manager.abort() diff --git a/src/python/src/grpc/framework/base/packets/_context.py b/src/python/src/grpc/framework/base/packets/_context.py index e09d4a60c9..45241c639e 100644 --- a/src/python/src/grpc/framework/base/packets/_context.py +++ b/src/python/src/grpc/framework/base/packets/_context.py @@ -31,10 +31,9 @@ import time -# _interfaces and packets are referenced from specification in this module. +# _interfaces is referenced from specification in this module. from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import -from grpc.framework.base.packets import packets # pylint: disable=unused-import class OperationContext(base_interfaces.OperationContext): @@ -48,8 +47,9 @@ class OperationContext(base_interfaces.OperationContext): Args: lock: The operation-wide lock. operation_id: An object identifying the operation. - local_failure: Whichever one of packets.Kind.SERVICED_FAILURE or - packets.Kind.SERVICER_FAILURE describes local failure of customer code. + local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE + or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of + customer code. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. diff --git a/src/python/src/grpc/framework/base/packets/_emission.py b/src/python/src/grpc/framework/base/packets/_emission.py index 9446b8665d..cfc9e40a24 100644 --- a/src/python/src/grpc/framework/base/packets/_emission.py +++ b/src/python/src/grpc/framework/base/packets/_emission.py @@ -29,29 +29,29 @@ """State and behavior for handling emitted values.""" -# packets is referenced from specifications in this module. +from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets # pylint: disable=unused-import class _EmissionManager(_interfaces.EmissionManager): """An implementation of _interfaces.EmissionManager.""" def __init__( - self, lock, failure_kind, termination_manager, transmission_manager): + self, lock, failure_outcome, termination_manager, transmission_manager): """Constructor. Args: lock: The operation-wide lock. - failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or - packets.Kind.SERVICER_FAILURE describes this object's methods being - called inappropriately by customer code. + failure_outcome: Whichever one of + base_interfaces.Outcome.SERVICED_FAILURE or + base_interfaces.Outcome.SERVICER_FAILURE describes this object's + methods being called inappropriately by customer code. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. """ self._lock = lock - self._failure_kind = failure_kind + self._failure_outcome = failure_outcome self._termination_manager = termination_manager self._transmission_manager = transmission_manager self._ingestion_manager = None @@ -65,8 +65,8 @@ class _EmissionManager(_interfaces.EmissionManager): self._expiration_manager = expiration_manager def _abort(self): - self._termination_manager.abort(self._failure_kind) - self._transmission_manager.abort(self._failure_kind) + self._termination_manager.abort(self._failure_outcome) + self._transmission_manager.abort(self._failure_outcome) self._ingestion_manager.abort() self._expiration_manager.abort() @@ -106,7 +106,7 @@ def front_emission_manager(lock, termination_manager, transmission_manager): An _interfaces.EmissionManager appropriate for front-side use. """ return _EmissionManager( - lock, packets.Kind.SERVICED_FAILURE, termination_manager, + lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager, transmission_manager) @@ -122,5 +122,5 @@ def back_emission_manager(lock, termination_manager, transmission_manager): An _interfaces.EmissionManager appropriate for back-side use. """ return _EmissionManager( - lock, packets.Kind.SERVICER_FAILURE, termination_manager, + lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager, transmission_manager) diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/packets/_ends.py index ac369c4fbd..614d1f666e 100644 --- a/src/python/src/grpc/framework/base/packets/_ends.py +++ b/src/python/src/grpc/framework/base/packets/_ends.py @@ -50,16 +50,6 @@ from grpc.framework.foundation import callable_util _IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' -_OPERATION_OUTCOMES = ( - base_interfaces.Outcome.COMPLETED, - base_interfaces.Outcome.CANCELLED, - base_interfaces.Outcome.EXPIRED, - base_interfaces.Outcome.RECEPTION_FAILURE, - base_interfaces.Outcome.TRANSMISSION_FAILURE, - base_interfaces.Outcome.SERVICER_FAILURE, - base_interfaces.Outcome.SERVICED_FAILURE, - ) - class _EasyOperation(base_interfaces.Operation): """A trivial implementation of base_interfaces.Operation.""" @@ -98,7 +88,7 @@ class _Endlette(object): # indicates an in-progress fire-and-forget operation for which the customer # has chosen to ignore results. self._operations = {} - self._stats = {outcome: 0 for outcome in _OPERATION_OUTCOMES} + self._stats = {outcome: 0 for outcome in base_interfaces.Outcome} self._idle_actions = [] def terminal_action(self, operation_id): @@ -198,7 +188,7 @@ def _front_operate( lock, transmission_pool, callback, operation_id, name, subscription.kind, trace_id, timeout, termination_manager) operation_context = _context.OperationContext( - lock, operation_id, packets.Kind.SERVICED_FAILURE, + lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager, transmission_manager) emission_manager = _emission.front_emission_manager( lock, termination_manager, transmission_manager) @@ -327,7 +317,7 @@ def _back_operate( lock, transmission_pool, callback, ticket.operation_id, termination_manager, ticket.subscription) operation_context = _context.OperationContext( - lock, ticket.operation_id, packets.Kind.SERVICER_FAILURE, + lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager, transmission_manager) emission_manager = _emission.back_emission_manager( lock, termination_manager, transmission_manager) diff --git a/src/python/src/grpc/framework/base/packets/_expiration.py b/src/python/src/grpc/framework/base/packets/_expiration.py index f58db28aa2..a9ecaeaa63 100644 --- a/src/python/src/grpc/framework/base/packets/_expiration.py +++ b/src/python/src/grpc/framework/base/packets/_expiration.py @@ -31,8 +31,8 @@ import time +from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets from grpc.framework.foundation import later @@ -73,8 +73,8 @@ class _ExpirationManager(_interfaces.ExpirationManager): with self._lock: if self._future is not None and index == self._index: self._future = None - self._termination_manager.abort(packets.Kind.EXPIRATION) - self._transmission_manager.abort(packets.Kind.EXPIRATION) + self._termination_manager.abort(base_interfaces.Outcome.EXPIRED) + self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED) self._ingestion_manager.abort() def start(self): diff --git a/src/python/src/grpc/framework/base/packets/_ingestion.py b/src/python/src/grpc/framework/base/packets/_ingestion.py index a750195ccb..c5c08fd98e 100644 --- a/src/python/src/grpc/framework/base/packets/_ingestion.py +++ b/src/python/src/grpc/framework/base/packets/_ingestion.py @@ -206,7 +206,7 @@ class _IngestionManager(_interfaces.IngestionManager): """An implementation of _interfaces.IngestionManager.""" def __init__( - self, lock, pool, consumer_creator, failure_kind, termination_manager, + self, lock, pool, consumer_creator, failure_outcome, termination_manager, transmission_manager): """Constructor. @@ -216,8 +216,10 @@ class _IngestionManager(_interfaces.IngestionManager): consumer_creator: A _ConsumerCreator wrapping the portion of customer code that when called returns the stream.Consumer with which the customer code will ingest payload values. - failure_kind: Whichever one of packets.Kind.SERVICED_FAILURE or - packets.Kind.SERVICER_FAILURE describes local failure of customer code. + failure_outcome: Whichever one of + interfaces.Outcome.SERVICED_FAILURE or + interfaces.Outcome.SERVICER_FAILURE describes local failure of + customer code. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. @@ -225,7 +227,7 @@ class _IngestionManager(_interfaces.IngestionManager): self._lock = lock self._pool = pool self._consumer_creator = consumer_creator - self._failure_kind = failure_kind + self._failure_outcome = failure_outcome self._termination_manager = termination_manager self._transmission_manager = transmission_manager self._expiration_manager = None @@ -299,12 +301,12 @@ class _IngestionManager(_interfaces.IngestionManager): else: with self._lock: if self._pending_ingestion is not None: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) self._processing = False return else: with self._lock: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) self._processing = False return @@ -316,16 +318,16 @@ class _IngestionManager(_interfaces.IngestionManager): _CREATE_CONSUMER_EXCEPTION_LOG_MESSAGE, requirement) if consumer_creation_outcome.return_value is None: with self._lock: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) self._processing = False elif consumer_creation_outcome.return_value.remote_error: with self._lock: - self._abort_and_notify(packets.Kind.RECEPTION_FAILURE) + self._abort_and_notify(interfaces.Outcome.RECEPTION_FAILURE) self._processing = False elif consumer_creation_outcome.return_value.abandoned: with self._lock: if self._pending_ingestion is not None: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) self._processing = False else: wrapped_ingestion_consumer = _WrappedConsumer( @@ -346,7 +348,7 @@ class _IngestionManager(_interfaces.IngestionManager): def consume(self, payload): if self._ingestion_complete: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) elif self._pending_ingestion is not None: if self._processing: self._pending_ingestion.append(payload) @@ -359,7 +361,7 @@ class _IngestionManager(_interfaces.IngestionManager): def terminate(self): if self._ingestion_complete: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) else: self._ingestion_complete = True if self._pending_ingestion is not None and not self._processing: @@ -371,7 +373,7 @@ class _IngestionManager(_interfaces.IngestionManager): def consume_and_terminate(self, payload): if self._ingestion_complete: - self._abort_and_notify(self._failure_kind) + self._abort_and_notify(self._failure_outcome) else: self._ingestion_complete = True if self._pending_ingestion is not None: @@ -397,19 +399,20 @@ def front_ingestion_manager( Args: lock: The operation-wide lock. pool: A thread pool in which to execute customer code. - subscription: A base_interfaces.ServicedSubscription indicating the + subscription: A interfaces.ServicedSubscription indicating the customer's interest in the results of the operation. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. - operation_context: A base_interfaces.OperationContext for the operation. + operation_context: A interfaces.OperationContext for the operation. Returns: An IngestionManager appropriate for front-side use. """ ingestion_manager = _IngestionManager( lock, pool, _FrontConsumerCreator(subscription, operation_context), - packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager) + interfaces.Outcome.SERVICED_FAILURE, termination_manager, + transmission_manager) ingestion_manager.start(None) return ingestion_manager @@ -422,11 +425,11 @@ def back_ingestion_manager( Args: lock: The operation-wide lock. pool: A thread pool in which to execute customer code. - servicer: A base_interfaces.Servicer for servicing the operation. + servicer: A interfaces.Servicer for servicing the operation. termination_manager: The _interfaces.TerminationManager for the operation. transmission_manager: The _interfaces.TransmissionManager for the operation. - operation_context: A base_interfaces.OperationContext for the operation. + operation_context: A interfaces.OperationContext for the operation. emission_consumer: The _interfaces.EmissionConsumer for the operation. Returns: @@ -435,5 +438,6 @@ def back_ingestion_manager( ingestion_manager = _IngestionManager( lock, pool, _BackConsumerCreator( servicer, operation_context, emission_consumer), - packets.Kind.SERVICER_FAILURE, termination_manager, transmission_manager) + interfaces.Outcome.SERVICER_FAILURE, termination_manager, + transmission_manager) return ingestion_manager diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/packets/_interfaces.py index 64dc33e8d5..64184bdf7c 100644 --- a/src/python/src/grpc/framework/base/packets/_interfaces.py +++ b/src/python/src/grpc/framework/base/packets/_interfaces.py @@ -83,11 +83,11 @@ class TerminationManager(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, kind): + def abort(self, outcome): """Indicates that the operation must abort for the indicated reason. Args: - kind: A value of packets.Kind indicating operation abortion. + outcome: A base_interfaces.Outcome indicating operation abortion. """ raise NotImplementedError() @@ -109,11 +109,11 @@ class TransmissionManager(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, kind): + def abort(self, outcome): """Indicates that the operation has aborted for the indicated reason. Args: - kind: A value of packets.Kind indicating operation abortion. + outcome: A base_interfaces.Outcome indicating operation abortion. """ raise NotImplementedError() diff --git a/src/python/src/grpc/framework/base/packets/_reception.py b/src/python/src/grpc/framework/base/packets/_reception.py index 6e2c9c0a4e..becbef828d 100644 --- a/src/python/src/grpc/framework/base/packets/_reception.py +++ b/src/python/src/grpc/framework/base/packets/_reception.py @@ -31,6 +31,7 @@ import abc +from grpc.framework.base import interfaces as base_interfaces from grpc.framework.base.packets import _interfaces from grpc.framework.base.packets import packets @@ -72,11 +73,11 @@ class _Receiver(object): def _abort( - category, termination_manager, transmission_manager, ingestion_manager, + outcome, termination_manager, transmission_manager, ingestion_manager, expiration_manager): - """Indicates abortion with the given category to the given managers.""" - termination_manager.abort(category) - transmission_manager.abort(category) + """Indicates abortion with the given outcome to the given managers.""" + termination_manager.abort(outcome) + transmission_manager.abort(outcome) ingestion_manager.abort() expiration_manager.abort() @@ -88,9 +89,9 @@ def _abort_if_abortive( Args: packet: A just-arrived packet. - abortive: A callable that takes a packet and returns an operation category - indicating that the operation should be aborted or None indicating that - the operation should not be aborted. + abortive: A callable that takes a packet and returns a + base_interfaces.Outcome indicating that the operation should be aborted + or None indicating that the operation should not be aborted. termination_manager: The operation's _interfaces.TerminationManager. transmission_manager: The operation's _interfaces.TransmissionManager. ingestion_manager: The operation's _interfaces.IngestionManager. @@ -99,12 +100,12 @@ def _abort_if_abortive( Returns: True if the operation was aborted; False otherwise. """ - abort_category = abortive(packet) - if abort_category is None: + abortion_outcome = abortive(packet) + if abortion_outcome is None: return False else: _abort( - abort_category, termination_manager, transmission_manager, + abortion_outcome, termination_manager, transmission_manager, ingestion_manager, expiration_manager) return True @@ -114,8 +115,8 @@ def _reception_failure( expiration_manager): """Aborts the operation with an indication of reception failure.""" _abort( - packets.Kind.RECEPTION_FAILURE, termination_manager, transmission_manager, - ingestion_manager, expiration_manager) + base_interfaces.Outcome.RECEPTION_FAILURE, termination_manager, + transmission_manager, ingestion_manager, expiration_manager) class _BackReceiver(_Receiver): @@ -147,23 +148,22 @@ class _BackReceiver(_Receiver): packet: A just-arrived packet. Returns: - One of packets.Kind.CANCELLATION, packets.Kind.SERVICED_FAILURE, or - packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive - and how, or None, indicating that the packet is not abortive. + A base_interfaces.Outcome value describing operation abortion if the + packet is abortive or None if the packet is not abortive. """ if packet.kind is packets.Kind.CANCELLATION: - return packets.Kind.CANCELLATION + return base_interfaces.Outcome.CANCELLED elif packet.kind is packets.Kind.EXPIRATION: - return packets.Kind.EXPIRATION + return base_interfaces.Outcome.EXPIRED elif packet.kind is packets.Kind.SERVICED_FAILURE: - return packets.Kind.SERVICED_FAILURE + return base_interfaces.Outcome.SERVICED_FAILURE elif packet.kind is packets.Kind.RECEPTION_FAILURE: - return packets.Kind.SERVICED_FAILURE + return base_interfaces.Outcome.SERVICED_FAILURE elif (packet.kind in (packets.Kind.COMMENCEMENT, packets.Kind.ENTIRE) and self._first_packet_seen): - return packets.Kind.RECEPTION_FAILURE + return base_interfaces.Outcome.RECEPTION_FAILURE elif self._last_packet_seen: - return packets.Kind.RECEPTION_FAILURE + return base_interfaces.Outcome.RECEPTION_FAILURE else: return None @@ -236,18 +236,17 @@ class _FrontReceiver(_Receiver): packet: A just-arrived packet. Returns: - One of packets.Kind.EXPIRATION, packets.Kind.SERVICER_FAILURE, or - packets.Kind.RECEPTION_FAILURE, indicating that the packet is abortive - and how, or None, indicating that the packet is not abortive. + A base_interfaces.Outcome value describing operation abortion if the + packet is abortive or None if the packet is not abortive. """ if packet.kind is packets.Kind.EXPIRATION: - return packets.Kind.EXPIRATION + return base_interfaces.Outcome.EXPIRED elif packet.kind is packets.Kind.SERVICER_FAILURE: - return packets.Kind.SERVICER_FAILURE + return base_interfaces.Outcome.SERVICER_FAILURE elif packet.kind is packets.Kind.RECEPTION_FAILURE: - return packets.Kind.SERVICER_FAILURE + return base_interfaces.Outcome.SERVICER_FAILURE elif self._last_packet_seen: - return packets.Kind.RECEPTION_FAILURE + return base_interfaces.Outcome.RECEPTION_FAILURE else: return None diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/packets/_termination.py index 575eee65a8..6afba88fc4 100644 --- a/src/python/src/grpc/framework/base/packets/_termination.py +++ b/src/python/src/grpc/framework/base/packets/_termination.py @@ -34,21 +34,10 @@ import enum from grpc.framework.base import interfaces from grpc.framework.base.packets import _constants from grpc.framework.base.packets import _interfaces -from grpc.framework.base.packets import packets from grpc.framework.foundation import callable_util _CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' -_KINDS_TO_OUTCOMES = { - packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED, - packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED, - packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED, - packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE, - packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE, - packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE, - packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE, - } - @enum.unique class _Requirement(enum.Enum): @@ -78,8 +67,8 @@ class _TerminationManager(_interfaces.TerminationManager): action: An action to call on operation termination. requirements: A combination of _Requirement values identifying what must finish for the operation to be considered completed. - local_failure: A packets.Kind specifying what constitutes local failure of - customer work. + local_failure: An interfaces.Outcome specifying what constitutes local + failure of customer work. """ self._work_pool = work_pool self._utility_pool = utility_pool @@ -89,27 +78,23 @@ class _TerminationManager(_interfaces.TerminationManager): self._expiration_manager = None self._outstanding_requirements = set(requirements) - self._kind = None + self._outcome = None self._callbacks = [] def set_expiration_manager(self, expiration_manager): self._expiration_manager = expiration_manager - def _terminate(self, kind): + def _terminate(self, outcome): """Terminates the operation. Args: - kind: One of packets.Kind.COMPLETION, packets.Kind.CANCELLATION, - packets.Kind.EXPIRATION, packets.Kind.RECEPTION_FAILURE, - packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or - packets.Kind.SERVICED_FAILURE. + outcome: An interfaces.Outcome describing the outcome of the operation. """ self._expiration_manager.abort() self._outstanding_requirements = None callbacks = list(self._callbacks) self._callbacks = None - self._kind = kind - outcome = _KINDS_TO_OUTCOMES[kind] + self._outcome = outcome act = callable_util.with_exceptions_logged( self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) @@ -122,7 +107,7 @@ class _TerminationManager(_interfaces.TerminationManager): callback_outcome = callable_util.call_logging_exceptions( callback, _CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) if callback_outcome.exception is not None: - outcome = _KINDS_TO_OUTCOMES[self._local_failure] + outcome = self._local_failure break self._utility_pool.submit(act, outcome) @@ -141,8 +126,7 @@ class _TerminationManager(_interfaces.TerminationManager): if self._outstanding_requirements is None: self._work_pool.submit( callable_util.with_exceptions_logged( - callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), - _KINDS_TO_OUTCOMES[self._kind]) + callback, _CALLBACK_EXCEPTION_LOG_MESSAGE), self._outcome) else: self._callbacks.append(callback) @@ -151,28 +135,28 @@ class _TerminationManager(_interfaces.TerminationManager): if self._outstanding_requirements is not None: self._outstanding_requirements.discard(_Requirement.EMISSION) if not self._outstanding_requirements: - self._terminate(packets.Kind.COMPLETION) + self._terminate(interfaces.Outcome.COMPLETED) def transmission_complete(self): """See superclass method for specification.""" if self._outstanding_requirements is not None: self._outstanding_requirements.discard(_Requirement.TRANSMISSION) if not self._outstanding_requirements: - self._terminate(packets.Kind.COMPLETION) + self._terminate(interfaces.Outcome.COMPLETED) def ingestion_complete(self): """See superclass method for specification.""" if self._outstanding_requirements is not None: self._outstanding_requirements.discard(_Requirement.INGESTION) if not self._outstanding_requirements: - self._terminate(packets.Kind.COMPLETION) + self._terminate(interfaces.Outcome.COMPLETED) - def abort(self, kind): + def abort(self, outcome): """See _interfaces.TerminationManager.abort for specification.""" - if kind == self._local_failure: + if outcome is self._local_failure: self._has_failed_locally = True if self._outstanding_requirements is not None: - self._terminate(kind) + self._terminate(outcome) def front_termination_manager( @@ -195,7 +179,7 @@ def front_termination_manager( return _TerminationManager( work_pool, utility_pool, action, requirements, - packets.Kind.SERVICED_FAILURE) + interfaces.Outcome.SERVICED_FAILURE) def back_termination_manager(work_pool, utility_pool, action, subscription_kind): @@ -217,4 +201,4 @@ def back_termination_manager(work_pool, utility_pool, action, subscription_kind) return _TerminationManager( work_pool, utility_pool, action, requirements, - packets.Kind.SERVICER_FAILURE) + interfaces.Outcome.SERVICER_FAILURE) diff --git a/src/python/src/grpc/framework/base/packets/_transmission.py b/src/python/src/grpc/framework/base/packets/_transmission.py index ac7f4509db..002519b9a1 100644 --- a/src/python/src/grpc/framework/base/packets/_transmission.py +++ b/src/python/src/grpc/framework/base/packets/_transmission.py @@ -39,14 +39,24 @@ from grpc.framework.foundation import callable_util _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' -_FRONT_TO_BACK_NO_TRANSMISSION_KINDS = ( - packets.Kind.SERVICER_FAILURE, +_FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = ( + interfaces.Outcome.SERVICER_FAILURE, ) -_BACK_TO_FRONT_NO_TRANSMISSION_KINDS = ( - packets.Kind.CANCELLATION, - packets.Kind.SERVICED_FAILURE, +_BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = ( + interfaces.Outcome.CANCELLED, + interfaces.Outcome.SERVICED_FAILURE, ) +_ABORTION_OUTCOME_TO_PACKET_KIND = { + interfaces.Outcome.CANCELLED: packets.Kind.CANCELLATION, + interfaces.Outcome.EXPIRED: packets.Kind.EXPIRATION, + interfaces.Outcome.RECEPTION_FAILURE: packets.Kind.RECEPTION_FAILURE, + interfaces.Outcome.TRANSMISSION_FAILURE: packets.Kind.TRANSMISSION_FAILURE, + interfaces.Outcome.SERVICED_FAILURE: packets.Kind.SERVICED_FAILURE, + interfaces.Outcome.SERVICER_FAILURE: packets.Kind.SERVICER_FAILURE, +} + + class _Packetizer(object): """Common specification of different packet-creating behavior.""" @@ -72,18 +82,18 @@ class _Packetizer(object): raise NotImplementedError() @abc.abstractmethod - def packetize_abortion(self, operation_id, sequence_number, kind): + def packetize_abortion(self, operation_id, sequence_number, outcome): """Creates a packet indicating that the operation is aborted. Args: operation_id: The operation ID for the current operation. sequence_number: A sequence number for the packet. - kind: One of the values of packets.Kind indicating operational abortion. + outcome: An interfaces.Outcome value describing the operation abortion. Returns: An object of an appropriate type suitable for transmission to the other side of the operation, or None if transmission is not appropriate for - the given kind. + the given outcome. """ raise NotImplementedError() @@ -122,11 +132,12 @@ class _FrontPacketizer(_Packetizer): self._name, self._subscription_kind, self._trace_id, payload, self._timeout) - def packetize_abortion(self, operation_id, sequence_number, kind): + def packetize_abortion(self, operation_id, sequence_number, outcome): """See _Packetizer.packetize_abortion for specification.""" - if kind in _FRONT_TO_BACK_NO_TRANSMISSION_KINDS: + if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES: return None else: + kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome] return packets.FrontToBackPacket( operation_id, sequence_number, kind, None, None, None, None, None) @@ -141,11 +152,12 @@ class _BackPacketizer(_Packetizer): packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, payload) - def packetize_abortion(self, operation_id, sequence_number, kind): + def packetize_abortion(self, operation_id, sequence_number, outcome): """See _Packetizer.packetize_abortion for specification.""" - if kind in _BACK_TO_FRONT_NO_TRANSMISSION_KINDS: + if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES: return None else: + kind = _ABORTION_OUTCOME_TO_PACKET_KIND[outcome] return packets.BackToFrontPacket( operation_id, sequence_number, kind, None) @@ -178,7 +190,7 @@ class _EmptyTransmissionManager(TransmissionManager): def inmit(self, emission, complete): """See _interfaces.TransmissionManager.inmit for specification.""" - def abort(self, category): + def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" @@ -212,7 +224,7 @@ class _TransmittingTransmissionManager(TransmissionManager): self._emissions = [] self._emission_complete = False - self._kind = None + self._outcome = None self._lowest_unused_sequence_number = 0 self._transmitting = False @@ -239,17 +251,17 @@ class _TransmittingTransmissionManager(TransmissionManager): return self._packetizer.packetize( self._operation_id, sequence_number, emission, complete) - def _abortive_response_packet(self, kind): + def _abortive_response_packet(self, outcome): """Creates a packet indicating operation abortion. Args: - kind: One of the values of packets.Kind indicating operational abortion. + outcome: An interfaces.Outcome value describing operation abortion. Returns: A packet indicating operation abortion. """ packet = self._packetizer.packetize_abortion( - self._operation_id, self._lowest_unused_sequence_number, kind) + self._operation_id, self._lowest_unused_sequence_number, outcome) if packet is None: return None else: @@ -267,7 +279,7 @@ class _TransmittingTransmissionManager(TransmissionManager): """ if self._emissions is None: return False, None - elif self._kind is None: + elif self._outcome is None: if self._emissions: payload = self._emissions.pop(0) complete = self._emission_complete and not self._emissions @@ -278,7 +290,7 @@ class _TransmittingTransmissionManager(TransmissionManager): else: return self._emission_complete, None else: - packet = self._abortive_response_packet(self._kind) + packet = self._abortive_response_packet(self._outcome) self._emissions = None return False, None if packet is None else packet @@ -303,7 +315,8 @@ class _TransmittingTransmissionManager(TransmissionManager): else: with self._lock: self._emissions = None - self._termination_manager.abort(packets.Kind.TRANSMISSION_FAILURE) + self._termination_manager.abort( + interfaces.Outcome.TRANSMISSION_FAILURE) self._ingestion_manager.abort() self._expiration_manager.abort() self._transmitting = False @@ -315,19 +328,19 @@ class _TransmittingTransmissionManager(TransmissionManager): def inmit(self, emission, complete): """See _interfaces.TransmissionManager.inmit for specification.""" - if self._emissions is not None and self._kind is None: + if self._emissions is not None and self._outcome is None: self._emission_complete = complete if self._transmitting: self._emissions.append(emission) else: self._transmit(self._lead_packet(emission, complete)) - def abort(self, kind): + def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" - if self._emissions is not None and self._kind is None: - self._kind = kind + if self._emissions is not None and self._outcome is None: + self._outcome = outcome if not self._transmitting: - packet = self._abortive_response_packet(kind) + packet = self._abortive_response_packet(outcome) self._emissions = None if packet is not None: self._transmit(packet) |