aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/framework/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/framework/core')
-rw-r--r--src/python/grpcio/grpc/framework/core/_context.py2
-rw-r--r--src/python/grpcio/grpc/framework/core/_emission.py3
-rw-r--r--src/python/grpcio/grpc/framework/core/_expiration.py2
-rw-r--r--src/python/grpcio/grpc/framework/core/_ingestion.py66
-rw-r--r--src/python/grpcio/grpc/framework/core/_interfaces.py8
-rw-r--r--src/python/grpcio/grpc/framework/core/_operation.py2
-rw-r--r--src/python/grpcio/grpc/framework/core/_reception.py2
-rw-r--r--src/python/grpcio/grpc/framework/core/_transmission.py14
8 files changed, 65 insertions, 34 deletions
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
index 24a12b612e..76b3534530 100644
--- a/src/python/grpcio/grpc/framework/core/_context.py
+++ b/src/python/grpcio/grpc/framework/core/_context.py
@@ -60,7 +60,7 @@ class OperationContext(base.OperationContext):
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
+ self._transmission_manager.abort(outcome, None, None)
self._expiration_manager.terminate()
def outcome(self):
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
index 7c702ab2ce..2d7b2e2f10 100644
--- a/src/python/grpcio/grpc/framework/core/_emission.py
+++ b/src/python/grpcio/grpc/framework/core/_emission.py
@@ -82,7 +82,8 @@ class EmissionManager(_interfaces.EmissionManager):
completion_present and self._completion_seen or
allowance_present and allowance <= 0):
self._termination_manager.abort(base.Outcome.LOCAL_FAILURE)
- self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE)
+ self._transmission_manager.abort(
+ base.Outcome.LOCAL_FAILURE, None, None)
self._expiration_manager.terminate()
else:
self._initial_metadata_seen |= initial_metadata_present
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
index d94bdf2d2b..d8690b3a02 100644
--- a/src/python/grpcio/grpc/framework/core/_expiration.py
+++ b/src/python/grpcio/grpc/framework/core/_expiration.py
@@ -73,7 +73,7 @@ class _ExpirationManager(_interfaces.ExpirationManager):
if self._future is not None and index == self._index:
self._future = None
self._termination_manager.expire()
- self._transmission_manager.abort(base.Outcome.EXPIRED)
+ self._transmission_manager.abort(base.Outcome.EXPIRED, None, None)
return expire
def start(self):
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
index 59f7f8adc8..7b8127f3fc 100644
--- a/src/python/grpcio/grpc/framework/core/_ingestion.py
+++ b/src/python/grpcio/grpc/framework/core/_ingestion.py
@@ -31,6 +31,7 @@
import abc
import collections
+import enum
from grpc.framework.core import _constants
from grpc.framework.core import _interfaces
@@ -42,21 +43,31 @@ _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
-class _SubscriptionCreation(collections.namedtuple(
- '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
+class _SubscriptionCreation(
+ collections.namedtuple(
+ '_SubscriptionCreation',
+ ('kind', 'subscription', 'code', 'message',))):
"""A sum type for the outcome of ingestion initialization.
- Either subscription will be non-None, remote_error will be True, or abandoned
- will be True.
-
Attributes:
- subscription: A base.Subscription describing the customer's interest in
- operation values from the other side.
- remote_error: A boolean indicating that the subscription could not be
- created due to an error on the remote side of the operation.
- abandoned: A boolean indicating that subscription creation was abandoned.
+ kind: A Kind value coarsely indicating how subscription creation completed.
+ subscription: The created subscription. Only present if kind is
+ Kind.SUBSCRIPTION.
+ code: A code value to be sent to the other side of the operation along with
+ an indication that the operation is being aborted due to an error on the
+ remote side of the operation. Only present if kind is Kind.REMOTE_ERROR.
+ message: A message value to be sent to the other side of the operation
+ along with an indication that the operation is being aborted due to an
+ error on the remote side of the operation. Only present if kind is
+ Kind.REMOTE_ERROR.
"""
+ @enum.unique
+ class Kind(enum.Enum):
+ SUBSCRIPTION = 'subscription'
+ REMOTE_ERROR = 'remote error'
+ ABANDONED = 'abandoned'
+
class _SubscriptionCreator(object):
"""Common specification of subscription-creating behavior."""
@@ -101,12 +112,15 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator):
try:
subscription = self._servicer.service(
group, method, self._operation_context, self._output_operator)
- except base.NoSuchMethodError:
- return _SubscriptionCreation(None, True, False)
+ except base.NoSuchMethodError as e:
+ return _SubscriptionCreation(
+ _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message)
except abandonment.Abandoned:
- return _SubscriptionCreation(None, False, True)
+ return _SubscriptionCreation(
+ _SubscriptionCreation.Kind.ABANDONED, None, None, None)
else:
- return _SubscriptionCreation(subscription, False, False)
+ return _SubscriptionCreation(
+ _SubscriptionCreation.Kind.SUBSCRIPTION, subscription, None, None)
def _wrap(behavior):
@@ -176,10 +190,10 @@ class _IngestionManager(_interfaces.IngestionManager):
self._pending_payloads = None
self._pending_completion = None
- def _abort_and_notify(self, outcome):
+ def _abort_and_notify(self, outcome, code, message):
self._abort_internal_only()
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
+ self._transmission_manager.abort(outcome, code, message)
self._expiration_manager.terminate()
def _operator_next(self):
@@ -236,12 +250,12 @@ class _IngestionManager(_interfaces.IngestionManager):
else:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
return
else:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
return
def _operator_post_create(self, subscription):
@@ -260,20 +274,22 @@ class _IngestionManager(_interfaces.IngestionManager):
def _create(self, subscription_creator, group, name):
outcome = callable_util.call_logging_exceptions(
- subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
- group, name)
+ subscription_creator.create,
+ _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, group, name)
if outcome.return_value is None:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
- elif outcome.return_value.abandoned:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED:
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
- elif outcome.return_value.remote_error:
+ self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None)
+ elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR:
+ code = outcome.return_value.code
+ message = outcome.return_value.message
with self._lock:
if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
+ self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message)
elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
self._operator_post_create(outcome.return_value.subscription)
else:
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
index a626b9f767..deb5f34f9b 100644
--- a/src/python/grpcio/grpc/framework/core/_interfaces.py
+++ b/src/python/grpcio/grpc/framework/core/_interfaces.py
@@ -155,13 +155,19 @@ class TransmissionManager(object):
raise NotImplementedError()
@abc.abstractmethod
- def abort(self, outcome):
+ def abort(self, outcome, code, message):
"""Indicates that the operation has aborted.
Args:
outcome: An interfaces.Outcome for the operation. If None, indicates that
the operation abortion should not be communicated to the other side of
the operation.
+ code: A code value to communicate to the other side of the operation
+ along with indication of operation abortion. May be None, and has no
+ effect if outcome is None.
+ message: A message value to communicate to the other side of the
+ operation along with indication of operation abortion. May be None, and
+ has no effect if outcome is None.
"""
raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
index d20e40a53d..cc873c03f9 100644
--- a/src/python/grpcio/grpc/framework/core/_operation.py
+++ b/src/python/grpcio/grpc/framework/core/_operation.py
@@ -79,7 +79,7 @@ class _EasyOperation(_interfaces.Operation):
with self._lock:
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
+ self._transmission_manager.abort(outcome, None, None)
self._expiration_manager.terminate()
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
index 0858f64ff6..1cebe3874b 100644
--- a/src/python/grpcio/grpc/framework/core/_reception.py
+++ b/src/python/grpcio/grpc/framework/core/_reception.py
@@ -73,7 +73,7 @@ class ReceptionManager(_interfaces.ReceptionManager):
self._aborted = True
if self._termination_manager.outcome is None:
self._termination_manager.abort(outcome)
- self._transmission_manager.abort(None)
+ self._transmission_manager.abort(None, None, None)
self._expiration_manager.terminate()
def _sequence_failure(self, ticket):
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
index 03644f4d49..efef87dd4c 100644
--- a/src/python/grpcio/grpc/framework/core/_transmission.py
+++ b/src/python/grpcio/grpc/framework/core/_transmission.py
@@ -104,9 +104,13 @@ class TransmissionManager(_interfaces.TransmissionManager):
return None
else:
self._abortion_outcome = None
+ if self._completion is None:
+ code, message = None, None
+ else:
+ code, message = self._completion.code, self._completion.message
return links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
- None, None, None, None, None, None, None, None, None,
+ None, None, None, None, None, None, None, code, message,
termination, None)
action = False
@@ -277,7 +281,7 @@ class TransmissionManager(_interfaces.TransmissionManager):
self._remote_complete = True
self._local_allowance = 0
- def abort(self, outcome):
+ def abort(self, outcome, code, message):
"""See _interfaces.TransmissionManager.abort for specification."""
if self._transmitting:
self._aborted, self._abortion_outcome = True, outcome
@@ -287,8 +291,12 @@ class TransmissionManager(_interfaces.TransmissionManager):
termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[
outcome]
if termination is not None:
+ if self._completion is None:
+ code, message = None, None
+ else:
+ code, message = self._completion.code, self._completion.message
ticket = links.Ticket(
self._operation_id, self._lowest_unused_sequence_number, None,
- None, None, None, None, None, None, None, None, None,
+ None, None, None, None, None, None, None, code, message,
termination, None)
self._transmit(ticket)