diff options
Diffstat (limited to 'src/python')
37 files changed, 528 insertions, 207 deletions
diff --git a/src/python/README.md b/src/python/README.md index a21deb33ef..67d1a173a2 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -4,7 +4,7 @@ The Python facility of gRPC. Status ------- -Alpha : Ready for early adopters +Beta : Core behavior well-used and proven; bugs lurk off the beaten path. PREREQUISITES ------------- diff --git a/src/python/grpcio/grpc/_adapter/_c/types.h b/src/python/grpcio/grpc/_adapter/_c/types.h index ec0687a9fd..31fd470d36 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types.h +++ b/src/python/grpcio/grpc/_adapter/_c/types.h @@ -112,6 +112,8 @@ void pygrpc_Call_dealloc(Call *self); PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs); PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs); PyObject *pygrpc_Call_peer(Call *self); +PyObject *pygrpc_Call_set_credentials(Call *self, PyObject *args, + PyObject *kwargs); extern PyTypeObject pygrpc_Call_type; diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c index 42a50151f6..5604aba39d 100644 --- a/src/python/grpcio/grpc/_adapter/_c/types/call.c +++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c @@ -43,6 +43,8 @@ PyMethodDef pygrpc_Call_methods[] = { {"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""}, {"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""}, {"peer", (PyCFunction)pygrpc_Call_peer, METH_NOARGS, ""}, + {"set_credentials", (PyCFunction)pygrpc_Call_set_credentials, METH_KEYWORDS, + ""}, {NULL} }; const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call."; @@ -169,3 +171,16 @@ PyObject *pygrpc_Call_peer(Call *self) { gpr_free(peer); return py_peer; } +PyObject *pygrpc_Call_set_credentials(Call *self, PyObject *args, + PyObject *kwargs) { + ClientCredentials *creds; + grpc_call_error errcode; + static char *keywords[] = {"creds", NULL}; + if (!PyArg_ParseTupleAndKeywords( + args, kwargs, "O!:set_credentials", keywords, + &pygrpc_ClientCredentials_type, &creds)) { + return NULL; + } + errcode = grpc_call_set_credentials(self->c_call, creds->c_creds); + return PyInt_FromLong(errcode); +} diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py index 06358e72bc..735ad205a4 100644 --- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py +++ b/src/python/grpcio/grpc/_adapter/_intermediary_low.py @@ -163,6 +163,9 @@ class Call(object): def cancel(self): return self._internal.cancel() + def set_credentials(self, creds): + return self._internal.set_credentials(creds) + class Channel(object): """Adapter from old _low.Channel interface to new _low.Channel.""" diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py index 3859ebb0e2..70ceb2a911 100644 --- a/src/python/grpcio/grpc/_adapter/_low.py +++ b/src/python/grpcio/grpc/_adapter/_low.py @@ -78,6 +78,9 @@ class Call(_types.Call): def peer(self): return self.call.peer() + def set_credentials(self, creds): + return self.call.set_credentials(creds) + class Channel(_types.Channel): diff --git a/src/python/grpcio/grpc/_links/_constants.py b/src/python/grpcio/grpc/_links/_constants.py new file mode 100644 index 0000000000..117fc5a639 --- /dev/null +++ b/src/python/grpcio/grpc/_links/_constants.py @@ -0,0 +1,42 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants for use within this package.""" + +from grpc._adapter import _intermediary_low +from grpc.beta import interfaces as beta_interfaces + +LOW_STATUS_CODE_TO_HIGH_STATUS_CODE = { + low: high for low, high in zip( + _intermediary_low.Code, beta_interfaces.StatusCode) +} + +HIGH_STATUS_CODE_TO_LOW_STATUS_CODE = { + high: low for low, high in LOW_STATUS_CODE_TO_HIGH_STATUS_CODE.items() +} diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py index 1676fe7941..fecb550ae0 100644 --- a/src/python/grpcio/grpc/_links/invocation.py +++ b/src/python/grpcio/grpc/_links/invocation.py @@ -36,6 +36,7 @@ import threading import time from grpc._adapter import _intermediary_low +from grpc._links import _constants from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay @@ -168,14 +169,17 @@ class _Kernel(object): termination = links.Ticket.Termination.CANCELLATION elif event.status.code is _intermediary_low.Code.DEADLINE_EXCEEDED: termination = links.Ticket.Termination.EXPIRATION + elif event.status.code is _intermediary_low.Code.UNIMPLEMENTED: + termination = links.Ticket.Termination.REMOTE_FAILURE elif event.status.code is _intermediary_low.Code.UNKNOWN: termination = links.Ticket.Termination.LOCAL_FAILURE else: termination = links.Ticket.Termination.TRANSMISSION_FAILURE + code = _constants.LOW_STATUS_CODE_TO_HIGH_STATUS_CODE[event.status.code] ticket = links.Ticket( operation_id, rpc_state.sequence_number, None, None, None, None, None, - None, None, event.metadata, event.status.code, event.status.details, - termination, None) + None, None, event.metadata, code, event.status.details, termination, + None) rpc_state.sequence_number += 1 self._relay.add_value(ticket) diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 94e7cfc716..34d3b262c9 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -36,6 +36,7 @@ import threading import time from grpc._adapter import _intermediary_low +from grpc._links import _constants from grpc.framework.foundation import logging_pool from grpc.framework.foundation import relay from grpc.framework.interfaces.links import links @@ -122,13 +123,13 @@ def _metadatafy(call, metadata): call.add_metadata(metadata_key, metadata_value) -def _status(termination_kind, code, details): - effective_details = b'' if details is None else details - if code is None: - effective_code = _TERMINATION_KIND_TO_CODE[termination_kind] +def _status(termination_kind, high_code, details): + low_details = b'' if details is None else details + if high_code is None: + low_code = _TERMINATION_KIND_TO_CODE[termination_kind] else: - effective_code = code - return _intermediary_low.Status(effective_code, effective_details) + low_code = _constants.HIGH_STATUS_CODE_TO_LOW_STATUS_CODE[high_code] + return _intermediary_low.Status(low_code, low_details) class _Kernel(object): diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py index 4e46ffd17f..ebf91d80ab 100644 --- a/src/python/grpcio/grpc/beta/_server.py +++ b/src/python/grpcio/grpc/beta/_server.py @@ -32,9 +32,11 @@ import threading from grpc._links import service +from grpc.beta import interfaces from grpc.framework.core import implementations as _core_implementations from grpc.framework.crust import implementations as _crust_implementations from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base from grpc.framework.interfaces.links import utilities _DEFAULT_POOL_SIZE = 8 @@ -42,6 +44,23 @@ _DEFAULT_TIMEOUT = 300 _MAXIMUM_TIMEOUT = 24 * 60 * 60 +class _GRPCServicer(base.Servicer): + + def __init__(self, delegate): + self._delegate = delegate + + def service(self, group, method, context, output_operator): + try: + return self._delegate.service(group, method, context, output_operator) + except base.NoSuchMethodError as e: + if e.code is None and e.details is None: + raise base.NoSuchMethodError( + interfaces.StatusCode.UNIMPLEMENTED, + b'Method "%s" of service "%s" not implemented!' % (method, group)) + else: + raise + + def _disassemble(grpc_link, end_link, pool, event, grace): grpc_link.begin_stop() end_link.stop(grace).wait() @@ -99,8 +118,9 @@ def server( service_thread_pool = thread_pool assembly_thread_pool = None - servicer = _crust_implementations.servicer( - implementations, multi_implementation, service_thread_pool) + servicer = _GRPCServicer( + _crust_implementations.servicer( + implementations, multi_implementation, service_thread_pool)) grpc_link = service.service_link(request_deserializers, response_serializers) diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py new file mode 100644 index 0000000000..25e6a9c66b --- /dev/null +++ b/src/python/grpcio/grpc/beta/interfaces.py @@ -0,0 +1,54 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants and interfaces of the Beta API of gRPC Python.""" + +import enum + + +@enum.unique +class StatusCode(enum.Enum): + """Mirrors grpc_status_code in the C core.""" + OK = 0 + CANCELLED = 1 + UNKNOWN = 2 + INVALID_ARGUMENT = 3 + DEADLINE_EXCEEDED = 4 + NOT_FOUND = 5 + ALREADY_EXISTS = 6 + PERMISSION_DENIED = 7 + RESOURCE_EXHAUSTED = 8 + FAILED_PRECONDITION = 9 + ABORTED = 10 + OUT_OF_RANGE = 11 + UNIMPLEMENTED = 12 + INTERNAL = 13 + UNAVAILABLE = 14 + DATA_LOSS = 15 + UNAUTHENTICATED = 16 diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py index d3be3a4c4a..0f47cb48e0 100644 --- a/src/python/grpcio/grpc/framework/core/_constants.py +++ b/src/python/grpcio/grpc/framework/core/_constants.py @@ -44,14 +44,15 @@ TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = { # ticket should be sent to the other side in the event of such an # outcome. ABORTION_OUTCOME_TO_TICKET_TERMINATION = { - base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION, - base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION, - base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, - base.Outcome.REMOTE_SHUTDOWN: None, - base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE, - base.Outcome.TRANSMISSION_FAILURE: None, - base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, - base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, + base.Outcome.Kind.CANCELLED: links.Ticket.Termination.CANCELLATION, + base.Outcome.Kind.EXPIRED: links.Ticket.Termination.EXPIRATION, + base.Outcome.Kind.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, + base.Outcome.Kind.REMOTE_SHUTDOWN: None, + base.Outcome.Kind.RECEPTION_FAILURE: + links.Ticket.Termination.RECEPTION_FAILURE, + base.Outcome.Kind.TRANSMISSION_FAILURE: None, + base.Outcome.Kind.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, + base.Outcome.Kind.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, } INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:' diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py index 76b3534530..a346e9d478 100644 --- a/src/python/grpcio/grpc/framework/core/_context.py +++ b/src/python/grpcio/grpc/framework/core/_context.py @@ -33,6 +33,7 @@ import time # _interfaces is referenced from specification in this module. from grpc.framework.core import _interfaces # pylint: disable=unused-import +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base @@ -56,11 +57,12 @@ class OperationContext(base.OperationContext): self._transmission_manager = transmission_manager self._expiration_manager = expiration_manager - def _abort(self, outcome): + def _abort(self, outcome_kind): with self._lock: if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, None, None) self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, None, None) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() def outcome(self): @@ -85,8 +87,8 @@ class OperationContext(base.OperationContext): def cancel(self): """See base.OperationContext.cancel for specification.""" - self._abort(base.Outcome.CANCELLED) + self._abort(base.Outcome.Kind.CANCELLED) def fail(self, exception): """See base.OperationContext.fail for specification.""" - self._abort(base.Outcome.LOCAL_FAILURE) + self._abort(base.Outcome.Kind.LOCAL_FAILURE) diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py index 2d7b2e2f10..8ab59dc3e5 100644 --- a/src/python/grpcio/grpc/framework/core/_emission.py +++ b/src/python/grpcio/grpc/framework/core/_emission.py @@ -30,6 +30,7 @@ """State and behavior for handling emitted values.""" from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base @@ -81,9 +82,10 @@ class EmissionManager(_interfaces.EmissionManager): payload_present and self._completion_seen or completion_present and self._completion_seen or allowance_present and allowance <= 0): - self._termination_manager.abort(base.Outcome.LOCAL_FAILURE) - self._transmission_manager.abort( - base.Outcome.LOCAL_FAILURE, None, None) + outcome = _utilities.Outcome( + base.Outcome.Kind.LOCAL_FAILURE, None, None) + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() else: self._initial_metadata_seen |= initial_metadata_present diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index f57cde4e58..336e9c21fd 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -69,7 +69,7 @@ class _Cycle(object): def _abort(operations): for operation in operations: - operation.abort(base.Outcome.LOCAL_SHUTDOWN) + operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN) def _cancel_futures(futures): @@ -90,19 +90,19 @@ def _termination_action(lock, stats, operation_id, cycle): Args: lock: A lock to hold during the termination action. - states: A mapping from base.Outcome values to integers to increment with - the outcome given to the termination action. + stats: A mapping from base.Outcome.Kind values to integers to increment + with the outcome kind given to the termination action. operation_id: The operation ID for the termination action. cycle: A _Cycle value to be updated during the termination action. Returns: - A callable that takes an operation outcome as its sole parameter and that - should be used as the termination action for the operation associated - with the given operation ID. + A callable that takes an operation outcome kind as its sole parameter and + that should be used as the termination action for the operation + associated with the given operation ID. """ - def termination_action(outcome): + def termination_action(outcome_kind): with lock: - stats[outcome] += 1 + stats[outcome_kind] += 1 cycle.operations.pop(operation_id, None) if not cycle.operations: for action in cycle.idle_actions: @@ -127,7 +127,7 @@ class _End(End): self._lock = threading.Condition() self._servicer_package = servicer_package - self._stats = {outcome: 0 for outcome in base.Outcome} + self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind} self._mate = None diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py index d8690b3a02..ded0ab6bce 100644 --- a/src/python/grpcio/grpc/framework/core/_expiration.py +++ b/src/python/grpcio/grpc/framework/core/_expiration.py @@ -32,6 +32,7 @@ import time from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import later from grpc.framework.interfaces.base import base @@ -73,7 +74,8 @@ class _ExpirationManager(_interfaces.ExpirationManager): if self._future is not None and index == self._index: self._future = None self._termination_manager.expire() - self._transmission_manager.abort(base.Outcome.EXPIRED, None, None) + self._transmission_manager.abort( + _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None)) return expire def start(self): diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py index 7b8127f3fc..9a7959a2dd 100644 --- a/src/python/grpcio/grpc/framework/core/_ingestion.py +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -35,6 +35,7 @@ import enum from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import abandonment from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base @@ -46,7 +47,7 @@ _INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' class _SubscriptionCreation( collections.namedtuple( '_SubscriptionCreation', - ('kind', 'subscription', 'code', 'message',))): + ('kind', 'subscription', 'code', 'details',))): """A sum type for the outcome of ingestion initialization. Attributes: @@ -56,7 +57,7 @@ class _SubscriptionCreation( code: A code value to be sent to the other side of the operation along with an indication that the operation is being aborted due to an error on the remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. - message: A message value to be sent to the other side of the operation + details: A details value to be sent to the other side of the operation along with an indication that the operation is being aborted due to an error on the remote side of the operation. Only present if kind is Kind.REMOTE_ERROR. @@ -114,7 +115,7 @@ class _ServiceSubscriptionCreator(_SubscriptionCreator): group, method, self._operation_context, self._output_operator) except base.NoSuchMethodError as e: return _SubscriptionCreation( - _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.message) + _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details) except abandonment.Abandoned: return _SubscriptionCreation( _SubscriptionCreation.Kind.ABANDONED, None, None, None) @@ -190,11 +191,13 @@ class _IngestionManager(_interfaces.IngestionManager): self._pending_payloads = None self._pending_completion = None - def _abort_and_notify(self, outcome, code, message): + def _abort_and_notify(self, outcome_kind, code, details): self._abort_internal_only() - self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, code, message) - self._expiration_manager.terminate() + if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, code, details) + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() def _operator_next(self): """Computes the next step for full-subscription ingestion. @@ -250,12 +253,13 @@ class _IngestionManager(_interfaces.IngestionManager): else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify( + base.Outcome.Kind.LOCAL_FAILURE, None, None) return else: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) return def _operator_post_create(self, subscription): @@ -279,17 +283,18 @@ class _IngestionManager(_interfaces.IngestionManager): if outcome.return_value is None: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED: with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.LOCAL_FAILURE, None, None) + self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None) elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR: code = outcome.return_value.code - message = outcome.return_value.message + details = outcome.return_value.details with self._lock: if self._termination_manager.outcome is None: - self._abort_and_notify(base.Outcome.REMOTE_FAILURE, code, message) + self._abort_and_notify( + base.Outcome.Kind.REMOTE_FAILURE, code, details) elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: self._operator_post_create(outcome.return_value.subscription) else: diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py index deb5f34f9b..2a534cb7e7 100644 --- a/src/python/grpcio/grpc/framework/core/_interfaces.py +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -50,13 +50,13 @@ class TerminationManager(object): If the operation has already terminated the callback will not be called. Args: - callback: A callable that will be passed an interfaces.Outcome value. + callback: A callable that will be passed a base.Outcome value. Returns: None if the operation has not yet terminated and the passed callback will - be called when it does, or a base.Outcome value describing the operation - termination if the operation has terminated and the callback will not be - called as a result of this method call. + be called when it does, or a base.Outcome value describing the + operation termination if the operation has terminated and the callback + will not be called as a result of this method call. """ raise NotImplementedError() @@ -76,8 +76,13 @@ class TerminationManager(object): raise NotImplementedError() @abc.abstractmethod - def reception_complete(self): - """Indicates that reception from the other side is complete.""" + def reception_complete(self, code, details): + """Indicates that reception from the other side is complete. + + Args: + code: An application-specific code value. + details: An application-specific details value. + """ raise NotImplementedError() @abc.abstractmethod @@ -95,7 +100,7 @@ class TerminationManager(object): """Indicates that the operation must abort for the indicated reason. Args: - outcome: An interfaces.Outcome indicating operation abortion. + outcome: A base.Outcome indicating operation abortion. """ raise NotImplementedError() @@ -155,19 +160,13 @@ class TransmissionManager(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, outcome, code, message): + def abort(self, outcome): """Indicates that the operation has aborted. Args: - outcome: An interfaces.Outcome for the operation. If None, indicates that - the operation abortion should not be communicated to the other side of - the operation. - code: A code value to communicate to the other side of the operation - along with indication of operation abortion. May be None, and has no - effect if outcome is None. - message: A message value to communicate to the other side of the - operation along with indication of operation abortion. May be None, and - has no effect if outcome is None. + outcome: A base.Outcome for the operation. If None, indicates that the + operation abortion should not be communicated to the other side of the + operation. """ raise NotImplementedError() @@ -279,8 +278,7 @@ class ReceptionManager(object): """Handle a ticket from the other side of the operation. Args: - ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket - appropriate to this end of the operation and this object. + ticket: A links.Ticket for the operation. """ raise NotImplementedError() @@ -305,10 +303,10 @@ class Operation(object): raise NotImplementedError() @abc.abstractmethod - def abort(self, outcome): + def abort(self, outcome_kind): """Aborts the operation. Args: - outcome: A base.Outcome value indicating operation abortion. + outcome_kind: A base.Outcome.Kind value indicating operation abortion. """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py index cc873c03f9..f5679d0356 100644 --- a/src/python/grpcio/grpc/framework/core/_operation.py +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -31,7 +31,6 @@ import threading -# _utilities is referenced from specification in this module. from grpc.framework.core import _context from grpc.framework.core import _emission from grpc.framework.core import _expiration @@ -40,7 +39,7 @@ from grpc.framework.core import _interfaces from grpc.framework.core import _reception from grpc.framework.core import _termination from grpc.framework.core import _transmission -from grpc.framework.core import _utilities # pylint: disable=unused-import +from grpc.framework.core import _utilities class _EasyOperation(_interfaces.Operation): @@ -75,11 +74,12 @@ class _EasyOperation(_interfaces.Operation): with self._lock: self._reception_manager.receive_ticket(ticket) - def abort(self, outcome): + def abort(self, outcome_kind): with self._lock: if self._termination_manager.outcome is None: + outcome = _utilities.Outcome(outcome_kind, None, None) self._termination_manager.abort(outcome) - self._transmission_manager.abort(outcome, None, None) + self._transmission_manager.abort(outcome) self._expiration_manager.terminate() diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py index 1cebe3874b..d374cf0c8c 100644 --- a/src/python/grpcio/grpc/framework/core/_reception.py +++ b/src/python/grpcio/grpc/framework/core/_reception.py @@ -30,21 +30,26 @@ """State and behavior for ticket reception.""" from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.interfaces.base import base from grpc.framework.interfaces.base import utilities from grpc.framework.interfaces.links import links -_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = { - links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED, - links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED, - links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN, - links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE, +_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND = { + links.Ticket.Termination.CANCELLATION: base.Outcome.Kind.CANCELLED, + links.Ticket.Termination.EXPIRATION: base.Outcome.Kind.EXPIRED, + links.Ticket.Termination.SHUTDOWN: base.Outcome.Kind.REMOTE_SHUTDOWN, + links.Ticket.Termination.RECEPTION_FAILURE: + base.Outcome.Kind.RECEPTION_FAILURE, links.Ticket.Termination.TRANSMISSION_FAILURE: - base.Outcome.TRANSMISSION_FAILURE, - links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE, - links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.LOCAL_FAILURE, + base.Outcome.Kind.TRANSMISSION_FAILURE, + links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.Kind.REMOTE_FAILURE, + links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.Kind.LOCAL_FAILURE, } +_RECEPTION_FAILURE_OUTCOME = _utilities.Outcome( + base.Outcome.Kind.RECEPTION_FAILURE, None, None) + class ReceptionManager(_interfaces.ReceptionManager): """A ReceptionManager based around a _Receiver passed to it.""" @@ -73,7 +78,7 @@ class ReceptionManager(_interfaces.ReceptionManager): self._aborted = True if self._termination_manager.outcome is None: self._termination_manager.abort(outcome) - self._transmission_manager.abort(None, None, None) + self._transmission_manager.abort(None) self._expiration_manager.terminate() def _sequence_failure(self, ticket): @@ -102,6 +107,7 @@ class ReceptionManager(_interfaces.ReceptionManager): else: completion = utilities.completion( ticket.terminal_metadata, ticket.code, ticket.message) + self._termination_manager.reception_complete(ticket.code, ticket.message) self._ingestion_manager.advance( ticket.initial_metadata, ticket.payload, completion, ticket.allowance) if ticket.allowance is not None: @@ -129,10 +135,12 @@ class ReceptionManager(_interfaces.ReceptionManager): if self._aborted: return elif self._sequence_failure(ticket): - self._abort(base.Outcome.RECEPTION_FAILURE) + self._abort(_RECEPTION_FAILURE_OUTCOME) elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION): - outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination] - self._abort(outcome) + outcome_kind = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND[ + ticket.termination] + self._abort( + _utilities.Outcome(outcome_kind, ticket.code, ticket.message)) elif ticket.sequence_number == self._lowest_unseen_sequence_number: self._process(ticket) else: diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py index ad9f6123d8..bdb9147e5b 100644 --- a/src/python/grpcio/grpc/framework/core/_termination.py +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -33,6 +33,7 @@ import abc from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base @@ -74,7 +75,8 @@ class _TerminationManager(TerminationManager): predicate: One of _invocation_completion_predicate or _service_completion_predicate to be used to determine when the operation has completed. - action: A behavior to pass the operation outcome on operation termination. + action: A behavior to pass the operation outcome's kind on operation + termination. pool: A thread pool. """ self._predicate = predicate @@ -82,14 +84,19 @@ class _TerminationManager(TerminationManager): self._pool = pool self._expiration_manager = None - self.outcome = None self._callbacks = [] + self._code = None + self._details = None self._emission_complete = False self._transmission_complete = False self._reception_complete = False self._ingestion_complete = False + # The None-ness of outcome is the operation-wide record of whether and how + # the operation has terminated. + self.outcome = None + def set_expiration_manager(self, expiration_manager): self._expiration_manager = expiration_manager @@ -106,8 +113,10 @@ class _TerminationManager(TerminationManager): act = callable_util.with_exceptions_logged( self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) - if outcome is base.Outcome.LOCAL_FAILURE: - self._pool.submit(act, outcome) + # TODO(issue 3202): Don't call the local application's callbacks if it has + # previously shown a programming defect. + if False and outcome.kind is base.Outcome.Kind.LOCAL_FAILURE: + self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE) else: def call_callbacks_and_act(callbacks, outcome): for callback in callbacks: @@ -115,9 +124,11 @@ class _TerminationManager(TerminationManager): callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, outcome) if callback_outcome.exception is not None: - outcome = base.Outcome.LOCAL_FAILURE + act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE break - act(outcome) + else: + act_outcome_kind = outcome.kind + act(act_outcome_kind) self._pool.submit( callable_util.with_exceptions_logged( @@ -132,7 +143,9 @@ class _TerminationManager(TerminationManager): if self._predicate( self._emission_complete, self._transmission_complete, self._reception_complete, self._ingestion_complete): - self._terminate_and_notify(base.Outcome.COMPLETED) + self._terminate_and_notify( + _utilities.Outcome( + base.Outcome.Kind.COMPLETED, self._code, self._details)) return True else: return False @@ -163,10 +176,12 @@ class _TerminationManager(TerminationManager): else: return False - def reception_complete(self): + def reception_complete(self, code, details): """See superclass method for specification.""" if self.outcome is None: self._reception_complete = True + self._code = code + self._details = details self._perhaps_complete() def ingestion_complete(self): @@ -177,7 +192,8 @@ class _TerminationManager(TerminationManager): def expire(self): """See _interfaces.TerminationManager.expire for specification.""" - self._terminate_internal_only(base.Outcome.EXPIRED) + self._terminate_internal_only( + _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None)) def abort(self, outcome): """See _interfaces.TerminationManager.abort for specification.""" diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py index efef87dd4c..8f852cfe9f 100644 --- a/src/python/grpcio/grpc/framework/core/_transmission.py +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -29,14 +29,21 @@ """State and behavior for ticket transmission during an operation.""" +import collections +import enum + from grpc.framework.core import _constants from grpc.framework.core import _interfaces +from grpc.framework.core import _utilities from grpc.framework.foundation import callable_util from grpc.framework.interfaces.base import base from grpc.framework.interfaces.links import links _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' +_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome( + base.Outcome.Kind.TRANSMISSION_FAILURE, None, None) + def _explode_completion(completion): if completion is None: @@ -47,6 +54,31 @@ def _explode_completion(completion): links.Ticket.Termination.COMPLETION) +class _Abort( + collections.namedtuple( + '_Abort', ('kind', 'termination', 'code', 'details',))): + """Tracks whether the operation aborted and what is to be done about it. + + Attributes: + kind: A Kind value describing the overall kind of the _Abort. + termination: A links.Ticket.Termination value to be sent to the other side + of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED. + code: A code value to be sent to the other side of the operation. Only + valid if kind is Kind.ABORTED_NOTIFY_NEEDED. + details: A details value to be sent to the other side of the operation. + Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED. + """ + + @enum.unique + class Kind(enum.Enum): + NOT_ABORTED = 'not aborted' + ABORTED_NOTIFY_NEEDED = 'aborted notify needed' + ABORTED_NO_NOTIFY = 'aborted no notify' + +_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None) +_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None) + + class TransmissionManager(_interfaces.TransmissionManager): """An _interfaces.TransmissionManager that sends links.Tickets.""" @@ -79,8 +111,7 @@ class TransmissionManager(_interfaces.TransmissionManager): self._initial_metadata = None self._payloads = [] self._completion = None - self._aborted = False - self._abortion_outcome = None + self._abort = _NOT_ABORTED self._transmitting = False def set_expiration_manager(self, expiration_manager): @@ -94,24 +125,15 @@ class TransmissionManager(_interfaces.TransmissionManager): A links.Ticket to be sent to the other side of the operation or None if there is nothing to be sent at this time. """ - if self._aborted: - if self._abortion_outcome is None: - return None - else: - termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ - self._abortion_outcome] - if termination is None: - return None - else: - self._abortion_outcome = None - if self._completion is None: - code, message = None, None - else: - code, message = self._completion.code, self._completion.message - return links.Ticket( - self._operation_id, self._lowest_unused_sequence_number, None, - None, None, None, None, None, None, None, code, message, - termination, None) + if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY: + return None + elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED: + termination = self._abort.termination + code, details = self._abort.code, self._abort.details + self._abort = _ABORTED_NO_NOTIFY + return links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, None, None, None, None, code, details, termination, None) action = False # TODO(nathaniel): Support other subscriptions. @@ -174,8 +196,9 @@ class TransmissionManager(_interfaces.TransmissionManager): return else: with self._lock: + self._abort = _ABORTED_NO_NOTIFY if self._termination_manager.outcome is None: - self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) + self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME) self._expiration_manager.terminate() return @@ -201,6 +224,9 @@ class TransmissionManager(_interfaces.TransmissionManager): def advance(self, initial_metadata, payload, completion, allowance): """See _interfaces.TransmissionManager.advance for specification.""" + if self._abort.kind is not _Abort.Kind.NOT_ABORTED: + return + effective_initial_metadata = initial_metadata effective_payload = payload effective_completion = completion @@ -246,7 +272,9 @@ class TransmissionManager(_interfaces.TransmissionManager): def timeout(self, timeout): """See _interfaces.TransmissionManager.timeout for specification.""" - if self._transmitting: + if self._abort.kind is not _Abort.Kind.NOT_ABORTED: + return + elif self._transmitting: self._timeout = timeout else: ticket = links.Ticket( @@ -257,7 +285,9 @@ class TransmissionManager(_interfaces.TransmissionManager): def allowance(self, allowance): """See _interfaces.TransmissionManager.allowance for specification.""" - if self._transmitting or not self._payloads: + if self._abort.kind is not _Abort.Kind.NOT_ABORTED: + return + elif self._transmitting or not self._payloads: self._remote_allowance += allowance else: self._remote_allowance += allowance - 1 @@ -281,22 +311,24 @@ class TransmissionManager(_interfaces.TransmissionManager): self._remote_complete = True self._local_allowance = 0 - def abort(self, outcome, code, message): + def abort(self, outcome): """See _interfaces.TransmissionManager.abort for specification.""" - if self._transmitting: - self._aborted, self._abortion_outcome = True, outcome - else: - self._aborted = True - if outcome is not None: - termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ - outcome] - if termination is not None: - if self._completion is None: - code, message = None, None - else: - code, message = self._completion.code, self._completion.message + if self._abort.kind is _Abort.Kind.NOT_ABORTED: + if outcome is None: + self._abort = _ABORTED_NO_NOTIFY + else: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get( + outcome.kind) + if termination is None: + self._abort = _ABORTED_NO_NOTIFY + elif self._transmitting: + self._abort = _Abort( + _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code, + outcome.details) + else: ticket = links.Ticket( self._operation_id, self._lowest_unused_sequence_number, None, - None, None, None, None, None, None, None, code, message, - termination, None) + None, None, None, None, None, None, None, outcome.code, + outcome.details, termination, None) self._transmit(ticket) + self._abort = _ABORTED_NO_NOTIFY diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py index 5b0d798751..abedc727e4 100644 --- a/src/python/grpcio/grpc/framework/core/_utilities.py +++ b/src/python/grpcio/grpc/framework/core/_utilities.py @@ -31,6 +31,8 @@ import collections +from grpc.framework.interfaces.base import base + class ServicerPackage( collections.namedtuple( @@ -44,3 +46,9 @@ class ServicerPackage( maximum_timeout: A float indicating the maximum length of time in seconds to allow for an operation. """ + + +class Outcome( + base.Outcome, + collections.namedtuple('Outcome', ('kind', 'code', 'details',))): + """A trivial implementation of base.Outcome.""" diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py index f9077bedfe..4c6bf16f43 100644 --- a/src/python/grpcio/grpc/framework/crust/_calls.py +++ b/src/python/grpcio/grpc/framework/crust/_calls.py @@ -98,7 +98,7 @@ def blocking_unary_unary( rendezvous, unused_operation_context, unused_outcome = _invoke( end, group, method, timeout, initial_metadata, payload, True) if with_call: - return next(rendezvous, rendezvous) + return next(rendezvous), rendezvous else: return next(rendezvous) diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py index 01de3c15bd..7bddf46a57 100644 --- a/src/python/grpcio/grpc/framework/crust/_control.py +++ b/src/python/grpcio/grpc/framework/crust/_control.py @@ -110,30 +110,31 @@ class _Termination( _NOT_TERMINATED = _Termination(False, None, None) -_OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR = { - base.Outcome.COMPLETED: lambda *unused_args: _Termination(True, None, None), - base.Outcome.CANCELLED: lambda *args: _Termination( +_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = { + base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination( + True, None, None), + base.Outcome.Kind.CANCELLED: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.CANCELLED, *args), face.CancellationError(*args)), - base.Outcome.EXPIRED: lambda *args: _Termination( + base.Outcome.Kind.EXPIRED: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.EXPIRED, *args), face.ExpirationError(*args)), - base.Outcome.LOCAL_SHUTDOWN: lambda *args: _Termination( + base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args), face.LocalShutdownError(*args)), - base.Outcome.REMOTE_SHUTDOWN: lambda *args: _Termination( + base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args), face.RemoteShutdownError(*args)), - base.Outcome.RECEPTION_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), face.NetworkError(*args)), - base.Outcome.TRANSMISSION_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args), face.NetworkError(*args)), - base.Outcome.LOCAL_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args), face.LocalError(*args)), - base.Outcome.REMOTE_FAILURE: lambda *args: _Termination( + base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination( True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args), face.RemoteError(*args)), } @@ -247,13 +248,17 @@ class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call): else: initial_metadata = self._up_initial_metadata.value if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED: - terminal_metadata, code, details = None, None, None + terminal_metadata = None else: terminal_metadata = self._up_completion.value.terminal_metadata + if outcome.kind is base.Outcome.Kind.COMPLETED: code = self._up_completion.value.code details = self._up_completion.value.message - self._termination = _OPERATION_OUTCOME_TO_TERMINATION_CONSTRUCTOR[ - outcome](initial_metadata, terminal_metadata, code, details) + else: + code = outcome.code + details = outcome.details + self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[ + outcome.kind](initial_metadata, terminal_metadata, code, details) self._condition.notify_all() diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py index 2455a58f59..6ff7249e75 100644 --- a/src/python/grpcio/grpc/framework/crust/_service.py +++ b/src/python/grpcio/grpc/framework/crust/_service.py @@ -154,7 +154,7 @@ def adapt_multi_method(multi_method, pool): outcome = operation_context.add_termination_callback(rendezvous.set_outcome) if outcome is None: def in_pool(): - request_consumer = multi_method( + request_consumer = multi_method.service( group, method, rendezvous, _ServicerContext(rendezvous)) for request in rendezvous: request_consumer.consume(request) diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py index 12f7e79641..d38fab8ba0 100644 --- a/src/python/grpcio/grpc/framework/crust/implementations.py +++ b/src/python/grpcio/grpc/framework/crust/implementations.py @@ -49,12 +49,12 @@ class _BaseServicer(base.Servicer): return adapted_method(output_operator, context) elif self._adapted_multi_method is not None: try: - return self._adapted_multi_method.service( + return self._adapted_multi_method( group, method, output_operator, context) except face.NoSuchMethodError: - raise base.NoSuchMethodError() + raise base.NoSuchMethodError(None, None) else: - raise base.NoSuchMethodError() + raise base.NoSuchMethodError(None, None) class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): @@ -315,8 +315,11 @@ def servicer(method_implementations, multi_method_implementation, pool): """ adapted_implementations = _adapt_method_implementations( method_implementations, pool) - adapted_multi_method_implementation = _service.adapt_multi_method( - multi_method_implementation, pool) + if multi_method_implementation is None: + adapted_multi_method_implementation = None + else: + adapted_multi_method_implementation = _service.adapt_multi_method( + multi_method_implementation, pool) return _BaseServicer( adapted_implementations, adapted_multi_method_implementation) diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py index bc52efb4c5..0d9d6b464e 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/base.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -40,7 +40,7 @@ applications choose. # threading is referenced from specification in this module. import abc import enum -import threading +import threading # pylint: disable=unused-import # abandonment is referenced from specification in this module. from grpc.framework.foundation import abandonment # pylint: disable=unused-import @@ -69,19 +69,30 @@ class NoSuchMethodError(Exception): self.details = details -@enum.unique -class Outcome(enum.Enum): - """Operation outcomes.""" +class Outcome(object): + """The outcome of an operation. - COMPLETED = 'completed' - CANCELLED = 'cancelled' - EXPIRED = 'expired' - LOCAL_SHUTDOWN = 'local shutdown' - REMOTE_SHUTDOWN = 'remote shutdown' - RECEPTION_FAILURE = 'reception failure' - TRANSMISSION_FAILURE = 'transmission failure' - LOCAL_FAILURE = 'local failure' - REMOTE_FAILURE = 'remote failure' + Attributes: + kind: A Kind value coarsely identifying how the operation terminated. + code: An application-specific code value or None if no such value was + provided. + details: An application-specific details value or None if no such value was + provided. + """ + + @enum.unique + class Kind(enum.Enum): + """Ways in which an operation can terminate.""" + + COMPLETED = 'completed' + CANCELLED = 'cancelled' + EXPIRED = 'expired' + LOCAL_SHUTDOWN = 'local shutdown' + REMOTE_SHUTDOWN = 'remote shutdown' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + LOCAL_FAILURE = 'local failure' + REMOTE_FAILURE = 'remote failure' class Completion(object): @@ -294,8 +305,8 @@ class End(object): """Reports the number of terminated operations broken down by outcome. Returns: - A dictionary from Outcome value to an integer identifying the number - of operations that terminated with that outcome. + A dictionary from Outcome.Kind value to an integer identifying the number + of operations that terminated with that outcome kind. """ raise NotImplementedError() diff --git a/src/python/grpcio/setup.py b/src/python/grpcio/setup.py index caa71a4f7c..4735ce080b 100644 --- a/src/python/grpcio/setup.py +++ b/src/python/grpcio/setup.py @@ -104,7 +104,7 @@ _COMMAND_CLASS = { setuptools.setup( name='grpcio', - version='0.10.0a0', + version='0.11.0', ext_modules=_EXTENSION_MODULES, packages=list(_PACKAGES), package_dir=_PACKAGE_DIRECTORIES, diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py index f0bd989ea6..cafb6b6eae 100644 --- a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -38,6 +38,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.core import implementations from grpc.framework.interfaces.base import utilities from grpc_test import test_common as grpc_test_common @@ -45,8 +46,6 @@ from grpc_test.framework.common import test_constants from grpc_test.framework.interfaces.base import test_cases from grpc_test.framework.interfaces.base import test_interfaces -_CODE = _intermediary_low.Code.OK - class _SerializationBehaviors( collections.namedtuple( @@ -124,8 +123,8 @@ class _Implementation(test_interfaces.Implementation): def service_completion(self): return utilities.completion( - grpc_test_common.SERVICE_TERMINAL_METADATA, _CODE, - grpc_test_common.DETAILS) + grpc_test_common.SERVICE_TERMINAL_METADATA, + beta_interfaces.StatusCode.OK, grpc_test_common.DETAILS) def metadata_transmitted(self, original_metadata, transmitted_metadata): return original_metadata is None or grpc_test_common.metadata_transmitted( diff --git a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py index 28c0619f7c..a4d4dee38c 100644 --- a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py @@ -35,6 +35,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.core import implementations as core_implementations from grpc.framework.crust import implementations as crust_implementations from grpc.framework.foundation import logging_pool @@ -139,7 +140,7 @@ class _Implementation(test_interfaces.Implementation): return grpc_test_common.SERVICE_TERMINAL_METADATA def code(self): - return _intermediary_low.Code.OK + return beta_interfaces.StatusCode.OK def details(self): return grpc_test_common.DETAILS diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py index 716323cc20..77e83d5561 100644 --- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py +++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py @@ -34,6 +34,7 @@ import unittest from grpc._adapter import _intermediary_low from grpc._links import invocation from grpc._links import service +from grpc.beta import interfaces as beta_interfaces from grpc.framework.interfaces.links import links from grpc_test import test_common from grpc_test._links import _proto_scenarios @@ -93,7 +94,8 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): return None, None def create_service_completion(self): - return _intermediary_low.Code.OK, 'An exuberant test "details" message!' + return ( + beta_interfaces.StatusCode.OK, b'An exuberant test "details" message!') def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): self.assertTrue( @@ -110,7 +112,7 @@ class RoundTripTest(unittest.TestCase): test_group = 'test package.Test Group' test_method = 'test method' identity_transformation = {(test_group, test_method): _IDENTITY} - test_code = _intermediary_low.Code.OK + test_code = beta_interfaces.StatusCode.OK test_message = 'a test message' service_link = service.service_link( @@ -150,11 +152,13 @@ class RoundTripTest(unittest.TestCase): self.assertIs( invocation_mate.tickets()[-1].termination, links.Ticket.Termination.COMPLETION) + self.assertIs(invocation_mate.tickets()[-1].code, test_code) + self.assertEqual(invocation_mate.tickets()[-1].message, test_message) def _perform_scenario_test(self, scenario): test_operation_id = object() test_group, test_method = scenario.group_and_method() - test_code = _intermediary_low.Code.OK + test_code = beta_interfaces.StatusCode.OK test_message = 'a scenario test message' service_link = service.service_link( diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py index ce4c59c0ee..e9087a7949 100644 --- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py @@ -32,8 +32,8 @@ import collections import unittest -from grpc._adapter import _intermediary_low from grpc.beta import beta +from grpc.beta import interfaces from grpc_test import resources from grpc_test import test_common as grpc_test_common from grpc_test.beta import test_utilities @@ -116,7 +116,7 @@ class _Implementation(test_interfaces.Implementation): return grpc_test_common.SERVICE_TERMINAL_METADATA def code(self): - return _intermediary_low.Code.OK + return interfaces.StatusCode.OK def details(self): return grpc_test_common.DETAILS diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py new file mode 100644 index 0000000000..ecd10f2175 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py @@ -0,0 +1,75 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of RPC-method-not-found behavior.""" + +import unittest + +from grpc.beta import beta +from grpc.beta import interfaces +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_constants + + +class NotFoundTest(unittest.TestCase): + + def setUp(self): + self._server = beta.server({}) + port = self._server.add_insecure_port('[::]:0') + channel = beta.create_insecure_channel('localhost', port) + self._generic_stub = beta.generic_stub(channel) + self._server.start() + + def tearDown(self): + self._server.stop(0).wait() + self._generic_stub = None + + def test_blocking_unary_unary_not_found(self): + with self.assertRaises(face.LocalError) as exception_assertion_context: + self._generic_stub.blocking_unary_unary( + 'groop', 'meffod', b'abc', test_constants.LONG_TIMEOUT, + with_call=True) + self.assertIs( + exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) + + def test_future_stream_unary_not_found(self): + rpc_future = self._generic_stub.future_stream_unary( + 'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT) + with self.assertRaises(face.LocalError) as exception_assertion_context: + rpc_future.result() + self.assertIs( + exception_assertion_context.exception.code, + interfaces.StatusCode.UNIMPLEMENTED) + self.assertIs( + rpc_future.exception().code, interfaces.StatusCode.UNIMPLEMENTED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py index e4d2a7a0d7..46a01876d8 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_control.py @@ -236,8 +236,8 @@ class Instruction( collections.namedtuple( 'Instruction', ('kind', 'advance_args', 'advance_kwargs', 'conclude_success', - 'conclude_message', 'conclude_invocation_outcome', - 'conclude_service_outcome',))): + 'conclude_message', 'conclude_invocation_outcome_kind', + 'conclude_service_outcome_kind',))): """""" @enum.unique @@ -532,24 +532,24 @@ class _SequenceController(Controller): self._state.service_side_outcome = outcome if self._todo is not None or self._remaining_elements: self._failed('Premature service-side outcome %s!' % (outcome,)) - elif outcome is not self._sequence.outcome.service: + elif outcome.kind is not self._sequence.outcome_kinds.service: self._failed( - 'Incorrect service-side outcome: %s should have been %s' % ( - outcome, self._sequence.outcome.service)) + 'Incorrect service-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.service)) elif self._state.invocation_side_outcome is not None: - self._passed(self._state.invocation_side_outcome, outcome) + self._passed(self._state.invocation_side_outcome.kind, outcome.kind) def invocation_on_termination(self, outcome): with self._condition: self._state.invocation_side_outcome = outcome if self._todo is not None or self._remaining_elements: self._failed('Premature invocation-side outcome %s!' % (outcome,)) - elif outcome is not self._sequence.outcome.invocation: + elif outcome.kind is not self._sequence.outcome_kinds.invocation: self._failed( - 'Incorrect invocation-side outcome: %s should have been %s' % ( - outcome, self._sequence.outcome.invocation)) + 'Incorrect invocation-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.invocation)) elif self._state.service_side_outcome is not None: - self._passed(outcome, self._state.service_side_outcome) + self._passed(outcome.kind, self._state.service_side_outcome.kind) class _SequenceControllerCreator(ControllerCreator): diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py index 1d77aaebe6..f547d91681 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/_sequence.py @@ -103,13 +103,14 @@ class Element(collections.namedtuple('Element', ('kind', 'transmission',))): SERVICE_FAILURE = 'service failure' -class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))): +class OutcomeKinds( + collections.namedtuple('Outcome', ('invocation', 'service',))): """A description of the expected outcome of an operation test. Attributes: - invocation: The base.Outcome value expected on the invocation side of the - operation. - service: The base.Outcome value expected on the service side of the + invocation: The base.Outcome.Kind value expected on the invocation side of + the operation. + service: The base.Outcome.Kind value expected on the service side of the operation. """ @@ -117,7 +118,8 @@ class Outcome(collections.namedtuple('Outcome', ('invocation', 'service',))): class Sequence( collections.namedtuple( 'Sequence', - ('name', 'maximum_duration', 'invocation', 'elements', 'outcome',))): + ('name', 'maximum_duration', 'invocation', 'elements', + 'outcome_kinds',))): """Describes at a high level steps to perform in a test. Attributes: @@ -128,7 +130,8 @@ class Sequence( under test. elements: A sequence of Element values describing at coarse granularity actions to take during the operation under test. - outcome: An Outcome value describing the expected outcome of the test. + outcome_kinds: An OutcomeKinds value describing the expected outcome kinds + of the test. """ _EASY = Sequence( @@ -139,7 +142,7 @@ _EASY = Sequence( Element( Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)), ), - Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) _PEASY = Sequence( 'Peasy', @@ -154,7 +157,7 @@ _PEASY = Sequence( Element( Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)), ), - Outcome(base.Outcome.COMPLETED, base.Outcome.COMPLETED)) + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) # TODO(issue 2959): Finish this test suite. This tuple of sequences should diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py index 87332cf612..5065a3f38a 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -44,7 +44,8 @@ from grpc_test.framework.interfaces.base import test_interfaces _SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True)) -_EMPTY_OUTCOME_DICT = {outcome: 0 for outcome in base.Outcome} +_EMPTY_OUTCOME_KIND_DICT = { + outcome_kind: 0 for outcome_kind in base.Outcome.Kind} class _Serialization(test_interfaces.Serialization): @@ -119,7 +120,7 @@ class _Operator(base.Operator): class _Servicer(base.Servicer): - """An base.Servicer with instrumented for testing.""" + """A base.Servicer with instrumented for testing.""" def __init__(self, group, method, controllers, pool): self._condition = threading.Condition() @@ -223,11 +224,12 @@ class _OperationTest(unittest.TestCase): self.assertTrue( instruction.conclude_success, msg=instruction.conclude_message) - expected_invocation_stats = dict(_EMPTY_OUTCOME_DICT) - expected_invocation_stats[instruction.conclude_invocation_outcome] += 1 + expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_invocation_stats[ + instruction.conclude_invocation_outcome_kind] += 1 self.assertDictEqual(expected_invocation_stats, invocation_stats) - expected_service_stats = dict(_EMPTY_OUTCOME_DICT) - expected_service_stats[instruction.conclude_service_outcome] += 1 + expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_service_stats[instruction.conclude_service_outcome_kind] += 1 self.assertDictEqual(expected_service_stats, service_stats) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py index b7dd5d4d17..2d2a081955 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -82,8 +82,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for test_messages in test_messages_sequence: request = test_messages.request() - response = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) + response, call = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT, with_call=True) test_messages.verify(request, response, self) @@ -105,8 +105,8 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for test_messages in test_messages_sequence: requests = test_messages.requests() - response = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) + response, call = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT, with_call=True) test_messages.verify(requests, response, self) |