diff options
Diffstat (limited to 'src/python/grpcio/grpc/framework/core/_ingestion.py')
-rw-r--r-- | src/python/grpcio/grpc/framework/core/_ingestion.py | 66 |
1 files changed, 41 insertions, 25 deletions
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py index 59f7f8adc8..7b8127f3fc 100644 --- a/src/python/grpcio/grpc/framework/core/_ingestion.py +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -31,6 +31,7 @@ import abc import collections +import enum from grpc.framework.core import _constants from grpc.framework.core import _interfaces @@ -42,21 +43,31 @@ _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' _INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' -class _SubscriptionCreation(collections.namedtuple( - '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))): +class _SubscriptionCreation( + collections.namedtuple( + '_SubscriptionCreation', + ('kind', 'subscription', 'code', 'message',))): """A sum type for the outcome of ingestion initialization. - Either subscription will be non-None, remote_error will be True, or abandoned - will be True. - Attributes: - subscription: A base.Subscription describing the customer's interest in - operation values from the other side. - remote_error: A boolean indicating that the subscription could not be - created due to an error on the remote side of the operation. - abandoned: A boolean indicating that subscription creation was abandoned. + kind: A Kind value coarsely indicating how subscription creation completed. + subscription: The created subscription. Only present if kind is + Kind.SUBSCRIPTION. + code: A code value to be sent to the other side of the operation along with + an indication that the operation is being aborted due to an error on the + remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. + message: A message value to be sent to the other side of the operation + along with an indication that the operation is being aborted due to an + error on the remote side of the operation. Only present if kind is + Kind.REMOTE_ERROR. """ + @enum.unique + class Kind(enum.Enum): + SUBSCRIPTION = 'subscription' + REMOTE_ERROR = 'remote error' + ABANDONED = 'abandoned' + class _SubscriptionCreator(object): """Common specification of subscription-creating behavior.""" @@ -101,12 +112,15 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator): try: subscription = self._servicer.service( group, method, self._operation_context, self._output_operator) - except base.NoSuchMethodError: - return _SubscriptionCreation(None, True, False) + except base.NoSuchMethodError as e: + return _SubscriptionCreation( + _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message) except abandonment.Abandoned: - return _SubscriptionCreation(None, False, True) + return _SubscriptionCreation( + _SubscriptionCreation.Kind.ABANDONED, None, None, None) else: - return _SubscriptionCreation(subscription, False, False) + return _SubscriptionCreation( + _SubscriptionCreation.Kind.SUBSCRIPTION, subscription, None, None) def _wrap(behavior): @@ -176,10 +190,10 @@ class _IngestionManager(_interfaces.IngestionManager): self._pending_payloads = None self._pending_completion = None - def _abort_and_notify(self, outcome): + def _abort_and_notify(self, outcome, code, message): self._abort_internal_only() self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome) + self._transmission_manager.abort(outcome, code, message) self._expiration_manager.terminate() def _operator_next(self): @@ -236,12 +250,12 @@ class _IngestionManager(_interfaces.IngestionManager): else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) return else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) return def _operator_post_create(self, subscription): @@ -260,20 +274,22 @@ class _IngestionManager(_interfaces.IngestionManager): def _create(self, subscription_creator, group, name): outcome = callable_util.call_logging_exceptions( - subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, - group, name) + subscription_creator.create, + _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, group, name) if outcome.return_value is None: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE) - elif outcome.return_value.abandoned: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE) - elif outcome.return_value.remote_error: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR: + code = outcome.return_value.code + message = outcome.return_value.message with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.REMOTE_FAILURE) + self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message) elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: self._operator_post_create(outcome.return_value.subscription) else: |