diff options
author | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-08-23 15:14:36 -0700 |
---|---|---|
committer | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-08-23 15:14:36 -0700 |
commit | 03d281ab1b59185959afff7621d69a9b77387cfa (patch) | |
tree | c38aa6d7d2a662675dfba91ac09fc8e01bee5649 /src/python | |
parent | b7e55a20023b32177cb9b09e8ef0dc0d3cd0aa85 (diff) | |
parent | 71e29ef459378f404ad4377e960792435864f878 (diff) |
Merge pull request #2968 from nathanielmanistaatgoogle/nathaniel-core
The RPC Framework core package.
Diffstat (limited to 'src/python')
21 files changed, 2767 insertions, 14 deletions
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py new file mode 100644 index 0000000000..d3be3a4c4a --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_constants.py @@ -0,0 +1,59 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private constants for the package.""" + +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = { + base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE, + base.Subscription.Kind.TERMINATION_ONLY: + links.Ticket.Subscription.TERMINATION, + base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL, + } + +# Mapping from abortive operation outcome to ticket termination to be +# sent to the other side of the operation, or None to indicate that no +# ticket should be sent to the other side in the event of such an +# outcome. +ABORTION_OUTCOME_TO_TICKET_TERMINATION = { + base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION, + base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION, + base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, + base.Outcome.REMOTE_SHUTDOWN: None, + base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE, + base.Outcome.TRANSMISSION_FAILURE: None, + base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, + base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, +} + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:' +TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = ( + 'Exception calling termination callback!') diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py new file mode 100644 index 0000000000..24a12b612e --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_context.py @@ -0,0 +1,92 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation context.""" + +import time + +# _interfaces is referenced from specification in this module. +from grpc.framework.core import _interfaces # pylint: disable=unused-import +from grpc.framework.interfaces.base import base + + +class OperationContext(base.OperationContext): + """An implementation of interfaces.OperationContext.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + def _abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def outcome(self): + """See base.OperationContext.outcome for specification.""" + with self._lock: + return self._termination_manager.outcome + + def add_termination_callback(self, callback): + """See base.OperationContext.add_termination_callback.""" + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.add_callback(callback) + return None + else: + return self._termination_manager.outcome + + def time_remaining(self): + """See base.OperationContext.time_remaining for specification.""" + with self._lock: + deadline = self._expiration_manager.deadline() + return max(0.0, deadline - time.time()) + + def cancel(self): + """See base.OperationContext.cancel for specification.""" + self._abort(base.Outcome.CANCELLED) + + def fail(self, exception): + """See base.OperationContext.fail for specification.""" + self._abort(base.Outcome.LOCAL_FAILURE) diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py new file mode 100644 index 0000000000..7c702ab2ce --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_emission.py @@ -0,0 +1,97 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for handling emitted values.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base + + +class EmissionManager(_interfaces.EmissionManager): + """An EmissionManager implementation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = None + + self._initial_metadata_seen = False + self._payload_seen = False + self._completion_seen = False + + def set_ingestion_manager(self, ingestion_manager): + """Sets the ingestion manager with which this manager will cooperate. + + Args: + ingestion_manager: The _interfaces.IngestionManager for the operation. + """ + self._ingestion_manager = ingestion_manager + + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + initial_metadata_present = initial_metadata is not None + payload_present = payload is not None + completion_present = completion is not None + allowance_present = allowance is not None + with self._lock: + if self._termination_manager.outcome is None: + if (initial_metadata_present and ( + self._initial_metadata_seen or self._payload_seen or + self._completion_seen) or + payload_present and self._completion_seen or + completion_present and self._completion_seen or + allowance_present and allowance <= 0): + self._termination_manager.abort(base.Outcome.LOCAL_FAILURE) + self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE) + self._expiration_manager.terminate() + else: + self._initial_metadata_seen |= initial_metadata_present + self._payload_seen |= payload_present + self._completion_seen |= completion_present + if completion_present: + self._termination_manager.emission_complete() + self._ingestion_manager.local_emissions_done() + self._transmission_manager.advance( + initial_metadata, payload, completion, allowance) + if allowance_present: + self._ingestion_manager.add_local_allowance(allowance) diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py new file mode 100644 index 0000000000..fb2c532df6 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -0,0 +1,251 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of base.End.""" + +import abc +import enum +import threading +import uuid + +from grpc.framework.core import _operation +from grpc.framework.core import _utilities +from grpc.framework.foundation import callable_util +from grpc.framework.foundation import later +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' + + +class End(base.End, links.Link): + """A bridge between base.End and links.Link. + + Implementations of this interface translate arriving tickets into + calls on application objects implementing base interfaces and + translate calls from application objects implementing base interfaces + into tickets sent to a joined link. + """ + __metaclass__ = abc.ABCMeta + + +class _Cycle(object): + """State for a single start-stop End lifecycle.""" + + def __init__(self, pool): + self.pool = pool + self.grace = False + self.futures = [] + self.operations = {} + self.idle_actions = [] + + +def _abort(operations): + for operation in operations: + operation.abort(base.Outcome.LOCAL_SHUTDOWN) + + +def _cancel_futures(futures): + for future in futures: + futures.cancel() + + +def _future_shutdown(lock, cycle, event): + def in_future(): + with lock: + _abort(cycle.operations.values()) + _cancel_futures(cycle.futures) + pool = cycle.pool + cycle.pool.shutdown(wait=True) + return in_future + + +def _termination_action(lock, stats, operation_id, cycle): + """Constructs the termination action for a single operation. + + Args: + lock: A lock to hold during the termination action. + states: A mapping from base.Outcome values to integers to increment with + the outcome given to the termination action. + operation_id: The operation ID for the termination action. + cycle: A _Cycle value to be updated during the termination action. + + Returns: + A callable that takes an operation outcome as its sole parameter and that + should be used as the termination action for the operation associated + with the given operation ID. + """ + def termination_action(outcome): + with lock: + stats[outcome] += 1 + cycle.operations.pop(operation_id, None) + if not cycle.operations: + for action in cycle.idle_actions: + cycle.pool.submit(action) + cycle.idle_actions = [] + if cycle.grace: + _cancel_futures(cycle.futures) + return termination_action + + +class _End(End): + """An End implementation.""" + + def __init__(self, servicer_package): + """Constructor. + + Args: + servicer_package: A _ServicerPackage for servicing operations or None if + this end will not be used to service operations. + """ + self._lock = threading.Condition() + self._servicer_package = servicer_package + + self._stats = {outcome: 0 for outcome in base.Outcome} + + self._mate = None + + self._cycle = None + + def start(self): + """See base.End.start for specification.""" + with self._lock: + if self._cycle is not None: + raise ValueError('Tried to start a not-stopped End!') + else: + self._cycle = _Cycle(logging_pool.pool(1)) + + def stop(self, grace): + """See base.End.stop for specification.""" + with self._lock: + if self._cycle is None: + event = threading.Event() + event.set() + return event + elif not self._cycle.operations: + event = threading.Event() + self._cycle.pool.submit(event.set) + self._cycle.pool.shutdown(wait=False) + self._cycle = None + return event + else: + self._cycle.grace = True + event = threading.Event() + self._cycle.idle_actions.append(event.set) + if 0 < grace: + future = later.later( + grace, _future_shutdown(self._lock, self._cycle, event)) + self._cycle.futures.append(future) + else: + _abort(self._cycle.operations.values()) + return event + + def operate( + self, group, method, subscription, timeout, initial_metadata=None, + payload=None, completion=None): + """See base.End.operate for specification.""" + operation_id = uuid.uuid4() + with self._lock: + if self._cycle is None or self._cycle.grace: + raise ValueError('Can\'t operate on stopped or stopping End!') + termination_action = _termination_action( + self._lock, self._stats, operation_id, self._cycle) + operation = _operation.invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, self._mate.accept_ticket, termination_action, + self._cycle.pool) + self._cycle.operations[operation_id] = operation + return operation.context, operation.operator + + def operation_stats(self): + """See base.End.operation_stats for specification.""" + with self._lock: + return dict(self._stats) + + def add_idle_action(self, action): + """See base.End.add_idle_action for specification.""" + with self._lock: + if self._cycle is None: + raise ValueError('Can\'t add idle action to stopped End!') + action_with_exceptions_logged = callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE) + if self._cycle.operations: + self._cycle.idle_actions.append(action_with_exceptions_logged) + else: + self._cycle.pool.submit(action_with_exceptions_logged) + + def accept_ticket(self, ticket): + """See links.Link.accept_ticket for specification.""" + with self._lock: + if self._cycle is not None and not self._cycle.grace: + operation = self._cycle.operations.get(ticket.operation_id) + if operation is not None: + operation.handle_ticket(ticket) + elif self._servicer_package is not None: + termination_action = _termination_action( + self._lock, self._stats, ticket.operation_id, self._cycle) + operation = _operation.service_operate( + self._servicer_package, ticket, self._mate.accept_ticket, + termination_action, self._cycle.pool) + if operation is not None: + self._cycle.operations[ticket.operation_id] = operation + + def join_link(self, link): + """See links.Link.join_link for specification.""" + with self._lock: + self._mate = utilities.NULL_LINK if link is None else link + + +def serviceless_end_link(): + """Constructs an End usable only for invoking operations. + + Returns: + An End usable for translating operations into ticket exchange. + """ + return _End(None) + + +def serviceful_end_link(servicer, default_timeout, maximum_timeout): + """Constructs an End capable of servicing operations. + + Args: + servicer: An interfaces.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An End capable of servicing the operations requested of it through ticket + exchange. + """ + return _End( + _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout)) diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py new file mode 100644 index 0000000000..d94bdf2d2b --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_expiration.py @@ -0,0 +1,152 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation expiration.""" + +import time + +from grpc.framework.core import _interfaces +from grpc.framework.foundation import later +from grpc.framework.interfaces.base import base + + +class _ExpirationManager(_interfaces.ExpirationManager): + """An implementation of _interfaces.ExpirationManager.""" + + def __init__( + self, commencement, timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Constructor. + + Args: + commencement: The time in seconds since the epoch at which the operation + began. + timeout: A length of time in seconds to allow for the operation to run. + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run despite what is requested via this object's + change_timout method. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._commencement = commencement + self._maximum_timeout = maximum_timeout + + self._timeout = timeout + self._deadline = commencement + timeout + self._index = None + self._future = None + + def _expire(self, index): + def expire(): + with self._lock: + if self._future is not None and index == self._index: + self._future = None + self._termination_manager.expire() + self._transmission_manager.abort(base.Outcome.EXPIRED) + return expire + + def start(self): + self._index = 0 + self._future = later.later(self._timeout, self._expire(0)) + + def change_timeout(self, timeout): + if self._future is not None and timeout != self._timeout: + self._future.cancel() + new_timeout = min(timeout, self._maximum_timeout) + new_index = self._index + 1 + self._timeout = new_timeout + self._deadline = self._commencement + new_timeout + self._index = new_index + delay = self._deadline - time.time() + self._future = later.later(delay, self._expire(new_index)) + if new_timeout != timeout: + self._transmission_manager.timeout(new_timeout) + + def deadline(self): + return self._deadline + + def terminate(self): + if self._future: + self._future.cancel() + self._future = None + self._deadline_index = None + + +def invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for front-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for invocation-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), timeout, timeout, lock, termination_manager, + transmission_manager) + expiration_manager.start() + return expiration_manager + + +def service_expiration_manager( + timeout, default_timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for back-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. May + be None in which case default_timeout will be used. + default_timeout: The default length of time in seconds to allow for the + operation to run if the front-side customer has not specified such a value + (or if the value they specified is not yet known). + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for service-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), default_timeout if timeout is None else timeout, + maximum_timeout, lock, termination_manager, transmission_manager) + expiration_manager.start() + return expiration_manager diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py new file mode 100644 index 0000000000..59f7f8adc8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -0,0 +1,410 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ingestion during an operation.""" + +import abc +import collections + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + +_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' +_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' + + +class _SubscriptionCreation(collections.namedtuple( + '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))): + """A sum type for the outcome of ingestion initialization. + + Either subscription will be non-None, remote_error will be True, or abandoned + will be True. + + Attributes: + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. + remote_error: A boolean indicating that the subscription could not be + created due to an error on the remote side of the operation. + abandoned: A boolean indicating that subscription creation was abandoned. + """ + + +class _SubscriptionCreator(object): + """Common specification of subscription-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create(self, group, method): + """Creates the base.Subscription of the local customer. + + Any exceptions raised by this method should be attributed to and treated as + defects in the customer code called by this method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + + Returns: + A _SubscriptionCreation describing the result of subscription creation. + """ + raise NotImplementedError() + + +class _ServiceSubscriptionCreator(_SubscriptionCreator): + """A _SubscriptionCreator appropriate for service-side use.""" + + def __init__(self, servicer, operation_context, output_operator): + """Constructor. + + Args: + servicer: The base.Servicer that will service the operation. + operation_context: A base.OperationContext for the operation to be passed + to the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data + emitted by the customer. + """ + self._servicer = servicer + self._operation_context = operation_context + self._output_operator = output_operator + + def create(self, group, method): + try: + subscription = self._servicer.service( + group, method, self._operation_context, self._output_operator) + except base.NoSuchMethodError: + return _SubscriptionCreation(None, True, False) + except abandonment.Abandoned: + return _SubscriptionCreation(None, False, True) + else: + return _SubscriptionCreation(subscription, False, False) + + +def _wrap(behavior): + def wrapped(*args, **kwargs): + try: + behavior(*args, **kwargs) + except abandonment.Abandoned: + return False + else: + return True + return wrapped + + +class _IngestionManager(_interfaces.IngestionManager): + """An implementation of _interfaces.IngestionManager.""" + + def __init__( + self, lock, pool, subscription, subscription_creator, termination_manager, + transmission_manager, expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. May be None if + subscription_creator is not None. + subscription_creator: A _SubscriptionCreator wrapping the portion of + customer code that when called returns the base.Subscription describing + the customer's interest in operation values from the other side. May be + None if subscription is not None. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._pool = pool + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + if subscription is None: + self._subscription_creator = subscription_creator + self._wrapped_operator = None + elif subscription.kind is base.Subscription.Kind.FULL: + self._subscription_creator = None + self._wrapped_operator = _wrap(subscription.operator.advance) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError('Unsupported subscription "%s"!' % subscription.kind) + self._pending_initial_metadata = None + self._pending_payloads = [] + self._pending_completion = None + self._local_allowance = 1 + # A nonnegative integer or None, with None indicating that the local + # customer is done emitting anyway so there's no need to bother it by + # informing it that the remote customer has granted it further permission to + # emit. + self._remote_allowance = 0 + self._processing = False + + def _abort_internal_only(self): + self._subscription_creator = None + self._wrapped_operator = None + self._pending_initial_metadata = None + self._pending_payloads = None + self._pending_completion = None + + def _abort_and_notify(self, outcome): + self._abort_internal_only() + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _operator_next(self): + """Computes the next step for full-subscription ingestion. + + Returns: + An initial_metadata, payload, completion, allowance, continue quintet + indicating what operation values (if any) are available to pass into + customer code and whether or not there is anything immediately + actionable to call customer code to do. + """ + if self._wrapped_operator is None: + return None, None, None, None, False + else: + initial_metadata, payload, completion, allowance, action = [None] * 5 + if self._pending_initial_metadata is not None: + initial_metadata = self._pending_initial_metadata + self._pending_initial_metadata = None + action = True + if self._pending_payloads and 0 < self._local_allowance: + payload = self._pending_payloads.pop(0) + self._local_allowance -= 1 + action = True + if not self._pending_payloads and self._pending_completion is not None: + completion = self._pending_completion + self._pending_completion = None + action = True + if self._remote_allowance is not None and 0 < self._remote_allowance: + allowance = self._remote_allowance + self._remote_allowance = 0 + action = True + return initial_metadata, payload, completion, allowance, bool(action) + + def _operator_process( + self, wrapped_operator, initial_metadata, payload, + completion, allowance): + while True: + advance_outcome = callable_util.call_logging_exceptions( + wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE, + initial_metadata=initial_metadata, payload=payload, + completion=completion, allowance=allowance) + if advance_outcome.exception is None: + if advance_outcome.return_value: + with self._lock: + if self._termination_manager.outcome is not None: + return + if completion is not None: + self._termination_manager.ingestion_complete() + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if not moar: + self._processing = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + + def _operator_post_create(self, subscription): + wrapped_operator = _wrap(subscription.operator.advance) + with self._lock: + if self._termination_manager.outcome is not None: + return + self._wrapped_operator = wrapped_operator + self._subscription_creator = None + metadata, payload, completion, allowance, moar = self._operator_next() + if not moar: + self._processing = False + return + self._operator_process( + wrapped_operator, metadata, payload, completion, allowance) + + def _create(self, subscription_creator, group, name): + outcome = callable_util.call_logging_exceptions( + subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, + group, name) + if outcome.return_value is None: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.abandoned: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.remote_error: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.REMOTE_FAILURE) + elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: + self._operator_post_create(outcome.return_value.subscription) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError( + 'Unsupported "%s"!' % outcome.return_value.subscription.kind) + + def _store_advance(self, initial_metadata, payload, completion, allowance): + if initial_metadata is not None: + self._pending_initial_metadata = initial_metadata + if payload is not None: + self._pending_payloads.append(payload) + if completion is not None: + self._pending_completion = completion + if allowance is not None and self._remote_allowance is not None: + self._remote_allowance += allowance + + def _operator_advance(self, initial_metadata, payload, completion, allowance): + if self._processing: + self._store_advance(initial_metadata, payload, completion, allowance) + else: + action = False + if initial_metadata is not None: + action = True + if payload is not None: + if 0 < self._local_allowance: + self._local_allowance -= 1 + action = True + else: + self._pending_payloads.append(payload) + payload = False + if completion is not None: + if self._pending_payloads: + self._pending_completion = completion + else: + action = True + if allowance is not None and self._remote_allowance is not None: + allowance += self._remote_allowance + self._remote_allowance = 0 + action = True + if action: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_operator, initial_metadata, payload, completion, + allowance) + + def set_group_and_method(self, group, method): + """See _interfaces.IngestionManager.set_group_and_method for spec.""" + if self._subscription_creator is not None and not self._processing: + self._pool.submit( + callable_util.with_exceptions_logged( + self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._subscription_creator, group, method) + self._processing = True + + def add_local_allowance(self, allowance): + """See _interfaces.IngestionManager.add_local_allowance for spec.""" + if any((self._subscription_creator, self._wrapped_operator,)): + self._local_allowance += allowance + if not self._processing: + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if moar: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, + _constants.INTERNAL_ERROR_LOG_MESSAGE), + initial_metadata, payload, completion, allowance) + + def local_emissions_done(self): + self._remote_allowance = None + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.IngestionManager.advance for specification.""" + if self._subscription_creator is not None: + self._store_advance(initial_metadata, payload, completion, allowance) + elif self._wrapped_operator is not None: + self._operator_advance(initial_metadata, payload, completion, allowance) + + +def invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager): + """Creates an IngestionManager appropriate for invocation-side use. + + Args: + subscription: A base.Subscription indicating the customer's interest in the + data and results from the service-side of the operation. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for invocation-side use. + """ + return _IngestionManager( + lock, pool, subscription, None, termination_manager, transmission_manager, + expiration_manager) + + +def service_ingestion_manager( + servicer, operation_context, output_operator, lock, pool, + termination_manager, transmission_manager, expiration_manager): + """Creates an IngestionManager appropriate for service-side use. + + The returned IngestionManager will require its set_group_and_name method to be + called before its advance method may be called. + + Args: + servicer: A base.Servicer for servicing the operation. + operation_context: A base.OperationContext for the operation to be passed to + the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data output + by the customer. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for service-side use. + """ + subscription_creator = _ServiceSubscriptionCreator( + servicer, operation_context, output_operator) + return _IngestionManager( + lock, pool, None, subscription_creator, termination_manager, + transmission_manager, expiration_manager) diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py new file mode 100644 index 0000000000..a626b9f767 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -0,0 +1,308 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal interfaces.""" + +import abc + +from grpc.framework.interfaces.base import base + + +class TerminationManager(object): + """An object responsible for handling the termination of an operation. + + Attributes: + outcome: None if the operation is active or a base.Outcome value if it has + terminated. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def add_callback(self, callback): + """Registers a callback to be called on operation termination. + + If the operation has already terminated the callback will not be called. + + Args: + callback: A callable that will be passed an interfaces.Outcome value. + + Returns: + None if the operation has not yet terminated and the passed callback will + be called when it does, or a base.Outcome value describing the operation + termination if the operation has terminated and the callback will not be + called as a result of this method call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def emission_complete(self): + """Indicates that emissions from customer code have completed.""" + raise NotImplementedError() + + @abc.abstractmethod + def transmission_complete(self): + """Indicates that transmissions to the remote end are complete. + + Returns: + True if the operation has terminated or False if the operation remains + ongoing. + """ + raise NotImplementedError() + + @abc.abstractmethod + def reception_complete(self): + """Indicates that reception from the other side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def ingestion_complete(self): + """Indicates that customer code ingestion of received values is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def expire(self): + """Indicates that the operation must abort because it has taken too long.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation must abort for the indicated reason. + + Args: + outcome: An interfaces.Outcome indicating operation abortion. + """ + raise NotImplementedError() + + +class TransmissionManager(object): + """A manager responsible for transmitting to the other end of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """Transmits the values associated with operation invocation.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Accepts values for transmission to the other end of the operation. + + Args: + initial_metadata: An initial metadata value to be transmitted to the other + side of the operation. May only ever be non-None once. + payload: A payload value. + completion: A base.Completion value. May only ever be non-None in the last + transmission to be made to the other side. + allowance: A positive integer communicating the number of additional + payloads allowed to be transmitted from the other side to this side of + the operation, or None if no additional allowance is being granted in + this call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def timeout(self, timeout): + """Accepts for transmission to the other side a new timeout value. + + Args: + timeout: A positive float used as the new timeout value for the operation + to be transmitted to the other side. + """ + raise NotImplementedError() + + @abc.abstractmethod + def allowance(self, allowance): + """Indicates to this manager that the remote customer is allowing payloads. + + Args: + allowance: A positive integer indicating the number of additional payloads + the remote customer is allowing to be transmitted from this side of the + operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def remote_complete(self): + """Indicates to this manager that data from the remote side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation has aborted. + + Args: + outcome: An interfaces.Outcome for the operation. If None, indicates that + the operation abortion should not be communicated to the other side of + the operation. + """ + raise NotImplementedError() + + +class ExpirationManager(object): + """A manager responsible for aborting the operation if it runs out of time.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def change_timeout(self, timeout): + """Changes the timeout allotted for the operation. + + Operation duration is always measure from the beginning of the operation; + calling this method changes the operation's allotted time to timeout total + seconds, not timeout seconds from the time of this method call. + + Args: + timeout: A length of time in seconds to allow for the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deadline(self): + """Returns the time until which the operation is allowed to run. + + Returns: + The time (seconds since the epoch) at which the operation will expire. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates to this manager that the operation has terminated.""" + raise NotImplementedError() + + +class EmissionManager(base.Operator): + """A manager of values emitted by customer code.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + """Accepts a value emitted by customer code. + + This method should only be called by customer code. + + Args: + initial_metadata: An initial metadata value emitted by the local customer + to be sent to the other side of the operation. + payload: A payload value emitted by the local customer to be sent to the + other side of the operation. + completion: A Completion value emitted by the local customer to be sent to + the other side of the operation. + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to accept from the other side of the + operation. + """ + raise NotImplementedError() + + +class IngestionManager(object): + """A manager responsible for executing customer code. + + This name of this manager comes from its responsibility to pass successive + values from the other side of the operation into the code of the local + customer. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_group_and_method(self, group, method): + """Communicates to this IngestionManager the operation group and method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_local_allowance(self, allowance): + """Communicates to this IngestionManager that more payloads may be ingested. + + Args: + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to ingest. + """ + raise NotImplementedError() + + @abc.abstractmethod + def local_emissions_done(self): + """Indicates to this manager that local emissions are done.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Advances the operation by passing values to the local customer.""" + raise NotImplementedError() + + +class ReceptionManager(object): + """A manager responsible for receiving tickets from the other end.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def receive_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket + appropriate to this end of the operation and this object. + """ + raise NotImplementedError() + + +class Operation(object): + """An ongoing operation. + + Attributes: + context: A base.OperationContext object for the operation. + operator: A base.Operator object for the operation for use by the customer + of the operation. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def handle_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: A links.Ticket from the other side of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Aborts the operation. + + Args: + outcome: A base.Outcome value indicating operation abortion. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py new file mode 100644 index 0000000000..d20e40a53d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -0,0 +1,192 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of operations.""" + +import threading + +# _utilities is referenced from specification in this module. +from grpc.framework.core import _context +from grpc.framework.core import _emission +from grpc.framework.core import _expiration +from grpc.framework.core import _ingestion +from grpc.framework.core import _interfaces +from grpc.framework.core import _reception +from grpc.framework.core import _termination +from grpc.framework.core import _transmission +from grpc.framework.core import _utilities # pylint: disable=unused-import + + +class _EasyOperation(_interfaces.Operation): + """A trivial implementation of interfaces.Operation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, expiration_manager, + context, operator, reception_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + context: A base.OperationContext for use by the customer during the + operation. + operator: A base.Operator for use by the customer during the operation. + reception_manager: The _interfaces.ReceptionManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._reception_manager = reception_manager + + self.context = context + self.operator = operator + + def handle_ticket(self, ticket): + with self._lock: + self._reception_manager.receive_ticket(ticket) + + def abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + +def invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, ticket_sink, termination_action, pool): + """Constructs objects necessary for front-side operation management. + + Args: + operation_id: An object identifying the operation. + group: The group identifier of the operation. + method: The method identifier of the operation. + subscription: A base.Subscription describing the customer's interest in the + results of the operation. + timeout: A length of time in seconds to allow for the operation. + initial_metadata: An initial metadata value to be sent to the other side of + the operation. May be None if the initial metadata will be passed later or + if there will be no initial metadata passed at all. + payload: The first payload value to be transmitted to the other side. May be + None if there is no such value or if the customer chose not to pass it at + operation invocation. + completion: A base.Completion value indicating the end of values passed to + the other side of the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.invocation_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + transmission_manager.kick_off( + group, method, timeout, initial_metadata, payload, completion, None) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) + + +def service_operate( + servicer_package, ticket, ticket_sink, termination_action, pool): + """Constructs an Operation for service of an operation. + + Args: + servicer_package: A _utilities.ServicerPackage to be used servicing the + operation. + ticket: The first links.Ticket received for the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.service_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + ticket.operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.service_expiration_manager( + ticket.timeout, servicer_package.default_timeout, + servicer_package.maximum_timeout, lock, termination_manager, + transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.service_ingestion_manager( + servicer_package.servicer, operation_context, emission_manager, lock, + pool, termination_manager, transmission_manager, expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + reception_manager.receive_ticket(ticket) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py new file mode 100644 index 0000000000..b64faf8146 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_reception.py @@ -0,0 +1,137 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket reception.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.base import utilities +from grpc.framework.interfaces.links import links + +_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = { + links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED, + links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED, + links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN, + links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE, + links.Ticket.Termination.TRANSMISSION_FAILURE: + base.Outcome.TRANSMISSION_FAILURE, + links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE, +} + + +class ReceptionManager(_interfaces.ReceptionManager): + """A ReceptionManager based around a _Receiver passed to it.""" + + def __init__( + self, termination_manager, transmission_manager, expiration_manager, + ingestion_manager): + """Constructor. + + Args: + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + """ + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = ingestion_manager + + self._lowest_unseen_sequence_number = 0 + self._out_of_sequence_tickets = {} + self._aborted = False + + def _abort(self, outcome): + self._aborted = True + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _sequence_failure(self, ticket): + """Determines a just-arrived ticket's sequential legitimacy. + + Args: + ticket: A just-arrived ticket. + + Returns: + True if the ticket is sequentially legitimate; False otherwise. + """ + if ticket.sequence_number < self._lowest_unseen_sequence_number: + return True + elif ticket.sequence_number in self._out_of_sequence_tickets: + return True + else: + return False + + def _process_one(self, ticket): + if ticket.sequence_number == 0: + self._ingestion_manager.set_group_and_method(ticket.group, ticket.method) + if ticket.timeout is not None: + self._expiration_manager.change_timeout(ticket.timeout) + if ticket.termination is None: + completion = None + else: + completion = utilities.completion( + ticket.terminal_metadata, ticket.code, ticket.message) + self._ingestion_manager.advance( + ticket.initial_metadata, ticket.payload, completion, ticket.allowance) + if ticket.allowance is not None: + self._transmission_manager.allowance(ticket.allowance) + + def _process(self, ticket): + """Process those tickets ready to be processed. + + Args: + ticket: A just-arrived ticket the sequence number of which matches this + _ReceptionManager's _lowest_unseen_sequence_number field. + """ + while True: + self._process_one(ticket) + next_ticket = self._out_of_sequence_tickets.pop( + ticket.sequence_number + 1, None) + if next_ticket is None: + self._lowest_unseen_sequence_number = ticket.sequence_number + 1 + return + else: + ticket = next_ticket + + def receive_ticket(self, ticket): + """See _interfaces.ReceptionManager.receive_ticket for specification.""" + if self._aborted: + return + elif self._sequence_failure(ticket): + self._abort(base.Outcome.RECEPTION_FAILURE) + elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION): + outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination] + self._abort(outcome) + elif ticket.sequence_number == self._lowest_unseen_sequence_number: + self._process(ticket) + else: + self._out_of_sequence_tickets[ticket.sequence_number] = ticket diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py new file mode 100644 index 0000000000..ad9f6123d8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -0,0 +1,212 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation termination.""" + +import abc + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + + +def _invocation_completion_predicate( + unused_emission_complete, unused_transmission_complete, + unused_reception_complete, ingestion_complete): + return ingestion_complete + + +def _service_completion_predicate( + unused_emission_complete, transmission_complete, unused_reception_complete, + unused_ingestion_complete): + return transmission_complete + + +class TerminationManager(_interfaces.TerminationManager): + """A _interfaces.TransmissionManager on which another manager may be set.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_expiration_manager(self, expiration_manager): + """Sets the expiration manager with which this manager will interact. + + Args: + expiration_manager: The _interfaces.ExpirationManager associated with the + current operation. + """ + raise NotImplementedError() + + +class _TerminationManager(TerminationManager): + """An implementation of TerminationManager.""" + + def __init__(self, predicate, action, pool): + """Constructor. + + Args: + predicate: One of _invocation_completion_predicate or + _service_completion_predicate to be used to determine when the operation + has completed. + action: A behavior to pass the operation outcome on operation termination. + pool: A thread pool. + """ + self._predicate = predicate + self._action = action + self._pool = pool + self._expiration_manager = None + + self.outcome = None + self._callbacks = [] + + self._emission_complete = False + self._transmission_complete = False + self._reception_complete = False + self._ingestion_complete = False + + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + + def _terminate_internal_only(self, outcome): + """Terminates the operation. + + Args: + outcome: A base.Outcome describing the outcome of the operation. + """ + self.outcome = outcome + callbacks = list(self._callbacks) + self._callbacks = None + + act = callable_util.with_exceptions_logged( + self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) + + if outcome is base.Outcome.LOCAL_FAILURE: + self._pool.submit(act, outcome) + else: + def call_callbacks_and_act(callbacks, outcome): + for callback in callbacks: + callback_outcome = callable_util.call_logging_exceptions( + callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, + outcome) + if callback_outcome.exception is not None: + outcome = base.Outcome.LOCAL_FAILURE + break + act(outcome) + + self._pool.submit( + callable_util.with_exceptions_logged( + call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE), + callbacks, outcome) + + def _terminate_and_notify(self, outcome): + self._terminate_internal_only(outcome) + self._expiration_manager.terminate() + + def _perhaps_complete(self): + if self._predicate( + self._emission_complete, self._transmission_complete, + self._reception_complete, self._ingestion_complete): + self._terminate_and_notify(base.Outcome.COMPLETED) + return True + else: + return False + + def is_active(self): + """See _interfaces.TerminationManager.is_active for specification.""" + return self.outcome is None + + def add_callback(self, callback): + """See _interfaces.TerminationManager.add_callback for specification.""" + if self.outcome is None: + self._callbacks.append(callback) + return None + else: + return self.outcome + + def emission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._emission_complete = True + self._perhaps_complete() + + def transmission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._transmission_complete = True + return self._perhaps_complete() + else: + return False + + def reception_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._reception_complete = True + self._perhaps_complete() + + def ingestion_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._ingestion_complete = True + self._perhaps_complete() + + def expire(self): + """See _interfaces.TerminationManager.expire for specification.""" + self._terminate_internal_only(base.Outcome.EXPIRED) + + def abort(self, outcome): + """See _interfaces.TerminationManager.abort for specification.""" + self._terminate_and_notify(outcome) + + +def invocation_termination_manager(action, pool): + """Creates a TerminationManager appropriate for invocation-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for invocation-side use. + """ + return _TerminationManager(_invocation_completion_predicate, action, pool) + + +def service_termination_manager(action, pool): + """Creates a TerminationManager appropriate for service-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for service-side use. + """ + return _TerminationManager(_service_completion_predicate, action, pool) diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py new file mode 100644 index 0000000000..01894d398d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -0,0 +1,294 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket transmission during an operation.""" + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' + + +def _explode_completion(completion): + if completion is None: + return None, None, None, None + else: + return ( + completion.terminal_metadata, completion.code, completion.message, + links.Ticket.Termination.COMPLETION) + + +class TransmissionManager(_interfaces.TransmissionManager): + """An _interfaces.TransmissionManager that sends links.Tickets.""" + + def __init__( + self, operation_id, ticket_sink, lock, pool, termination_manager): + """Constructor. + + Args: + operation_id: The operation's ID. + ticket_sink: A callable that accepts tickets and sends them to the other + side of the operation. + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting tickets will be + performed. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + """ + self._lock = lock + self._pool = pool + self._ticket_sink = ticket_sink + self._operation_id = operation_id + self._termination_manager = termination_manager + self._expiration_manager = None + + self._lowest_unused_sequence_number = 0 + self._remote_allowance = 1 + self._remote_complete = False + self._timeout = None + self._local_allowance = 0 + self._initial_metadata = None + self._payloads = [] + self._completion = None + self._aborted = False + self._abortion_outcome = None + self._transmitting = False + + def set_expiration_manager(self, expiration_manager): + """Sets the ExpirationManager with which this manager will cooperate.""" + self._expiration_manager = expiration_manager + + def _next_ticket(self): + """Creates the next ticket to be transmitted. + + Returns: + A links.Ticket to be sent to the other side of the operation or None if + there is nothing to be sent at this time. + """ + if self._aborted: + if self._abortion_outcome is None: + return None + else: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + self._abortion_outcome] + if termination is None: + return None + else: + self._abortion_outcome = None + return links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + + action = False + # TODO(nathaniel): Support other subscriptions. + local_subscription = links.Ticket.Subscription.FULL + timeout = self._timeout + if timeout is not None: + self._timeout = None + action = True + if self._local_allowance <= 0: + allowance = None + else: + allowance = self._local_allowance + self._local_allowance = 0 + action = True + initial_metadata = self._initial_metadata + if initial_metadata is not None: + self._initial_metadata = None + action = True + if not self._payloads or self._remote_allowance <= 0: + payload = None + else: + payload = self._payloads.pop(0) + self._remote_allowance -= 1 + action = True + if self._completion is None or self._payloads: + terminal_metadata, code, message, termination = None, None, None, None + else: + terminal_metadata, code, message, termination = _explode_completion( + self._completion) + self._completion = None + action = True + + if action: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + local_subscription, timeout, allowance, initial_metadata, payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + return ticket + else: + return None + + def _transmit(self, ticket): + """Commences the transmission loop sending tickets. + + Args: + ticket: A links.Ticket to be sent to the other side of the operation. + """ + def transmit(ticket): + while True: + transmission_outcome = callable_util.call_logging_exceptions( + self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) + if transmission_outcome.exception is None: + with self._lock: + if ticket.termination is links.Ticket.Termination.COMPLETION: + self._termination_manager.transmission_complete() + ticket = self._next_ticket() + if ticket is None: + self._transmitting = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) + self._expiration_manager.terminate() + return + + self._pool.submit(callable_util.with_exceptions_logged( + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) + self._transmitting = True + + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """See _interfaces.TransmissionManager.kickoff for specification.""" + # TODO(nathaniel): Support other subscriptions. + subscription = links.Ticket.Subscription.FULL + terminal_metadata, code, message, termination = _explode_completion( + completion) + self._remote_allowance = 1 if payload is None else 0 + ticket = links.Ticket( + self._operation_id, 0, group, method, subscription, timeout, allowance, + initial_metadata, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number = 1 + self._transmit(ticket) + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.TransmissionManager.advance for specification.""" + effective_initial_metadata = initial_metadata + effective_payload = payload + effective_completion = completion + if allowance is not None and not self._remote_complete: + effective_allowance = allowance + else: + effective_allowance = None + if self._transmitting: + if effective_initial_metadata is not None: + self._initial_metadata = effective_initial_metadata + if effective_payload is not None: + self._payloads.append(effective_payload) + if effective_completion is not None: + self._completion = effective_completion + if effective_allowance is not None: + self._local_allowance += effective_allowance + else: + if effective_payload is not None: + if 0 < self._remote_allowance: + ticket_payload = effective_payload + self._remote_allowance -= 1 + else: + self._payloads.append(effective_payload) + ticket_payload = None + else: + ticket_payload = None + if effective_completion is not None and not self._payloads: + ticket_completion = effective_completion + else: + self._completion = effective_completion + ticket_completion = None + if any( + (effective_initial_metadata, ticket_payload, ticket_completion, + effective_allowance)): + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, allowance, effective_initial_metadata, ticket_payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def timeout(self, timeout): + """See _interfaces.TransmissionManager.timeout for specification.""" + if self._transmitting: + self._timeout = timeout + else: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, timeout, None, None, None, None, None, None, None) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def allowance(self, allowance): + """See _interfaces.TransmissionManager.allowance for specification.""" + if self._transmitting or not self._payloads: + self._remote_allowance += allowance + else: + self._remote_allowance += allowance - 1 + payload = self._payloads.pop(0) + if self._payloads: + completion = None + else: + completion = self._completion + self._completion = None + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, None, None, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def remote_complete(self): + """See _interfaces.TransmissionManager.remote_complete for specification.""" + self._remote_complete = True + self._local_allowance = 0 + + def abort(self, outcome): + """See _interfaces.TransmissionManager.abort for specification.""" + if self._transmitting: + self._aborted, self._abortion_outcome = True, outcome + else: + self._aborted = True + if outcome is not None: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + outcome] + if termination is not None: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + self._transmit(ticket) diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py new file mode 100644 index 0000000000..5b0d798751 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_utilities.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal utilities.""" + +import collections + + +class ServicerPackage( + collections.namedtuple( + 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))): + """A trivial bundle class. + + Attributes: + servicer: A base.Servicer. + default_timeout: A float indicating the length of time in seconds to allow + for an operation invoked without a timeout. + maximum_timeout: A float indicating the maximum length of time in seconds to + allow for an operation. + """ diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py new file mode 100644 index 0000000000..364a7faed4 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/implementations.py @@ -0,0 +1,62 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the ticket-exchange-based base layer implementation.""" + +# base and links are referenced from specification in this module. +from grpc.framework.core import _end +from grpc.framework.interfaces.base import base # pylint: disable=unused-import +from grpc.framework.interfaces.links import links # pylint: disable=unused-import + + +def invocation_end_link(): + """Creates a base.End-links.Link suitable for operation invocation. + + Returns: + An object that is both a base.End and a links.Link, that supports operation + invocation, and that translates operation invocation into ticket exchange. + """ + return _end.serviceless_end_link() + + +def service_end_link(servicer, default_timeout, maximum_timeout): + """Creates a base.End-links.Link suitable for operation service. + + Args: + servicer: A base.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An object that is both a base.End and a links.Link and that services + operations that arrive at it through ticket exchange. + """ + return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout) diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py index 9d1651daac..76e0a5bdae 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/base.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -27,10 +27,20 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""The base interface of RPC Framework.""" +"""The base interface of RPC Framework. +Implementations of this interface support the conduct of "operations": +exchanges between two distinct ends of an arbitrary number of data payloads +and metadata such as a name for the operation, initial and terminal metadata +in each direction, and flow control. These operations may be used for transfers +of data, remote procedure calls, status indication, or anything else +applications choose. +""" + +# threading is referenced from specification in this module. import abc import enum +import threading # abandonment is referenced from specification in this module. from grpc.framework.foundation import abandonment # pylint: disable=unused-import @@ -208,19 +218,26 @@ class End(object): raise NotImplementedError() @abc.abstractmethod - def stop_gracefully(self): - """Gracefully stops this object's service of operations. + def stop(self, grace): + """Stops this object's service of operations. - Operations in progress will be allowed to complete, and this method blocks - until all of them have. - """ - raise NotImplementedError() + This object will refuse service of new operations as soon as this method is + called but operations under way at the time of the call may be given a + grace period during which they are allowed to finish. - @abc.abstractmethod - def stop_immediately(self): - """Immediately stops this object's service of operations. + Args: + grace: A duration of time in seconds to allow ongoing operations to + terminate before being forcefully terminated by the stopping of this + End. May be zero to terminate all ongoing operations and immediately + stop. - Operations in progress will not be allowed to complete. + Returns: + A threading.Event that will be set to indicate all operations having + terminated and this End having completely stopped. The returned event + may not be set until after the full grace period (if some ongoing + operation continues for the full length of the period) or it may be set + much sooner (if for example this End had no operations in progress at + the time its stop method was called). """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index 5ebbac8a6f..069ff024dd 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -98,7 +98,7 @@ class Ticket( COMPLETION = 'completion' CANCELLATION = 'cancellation' EXPIRATION = 'expiration' - LOCAL_SHUTDOWN = 'local shutdown' + SHUTDOWN = 'shutdown' RECEPTION_FAILURE = 'reception failure' TRANSMISSION_FAILURE = 'transmission failure' LOCAL_FAILURE = 'local failure' diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py new file mode 100644 index 0000000000..72b1ae5642 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -0,0 +1,165 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import collections +import logging +import random +import time +import unittest + +from grpc._adapter import _intermediary_low +from grpc._links import invocation +from grpc._links import service +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test import test_common as grpc_test_common +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + +_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),) +_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),) +_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),) +_CODE = _intermediary_low.Code.OK +_MESSAGE = b'test message' + + +class _SerializationBehaviors( + collections.namedtuple( + '_SerializationBehaviors', + ('request_serializers', 'request_deserializers', 'response_serializers', + 'response_deserializers',))): + pass + + +class _Links( + collections.namedtuple( + '_Links', + ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link', + 'service_end_link'))): + pass + + +def _serialization_behaviors_from_serializations(serializations): + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for (group, method), serialization in serializations.iteritems(): + request_serializers[group, method] = serialization.serialize_request + request_deserializers[group, method] = serialization.deserialize_request + response_serializers[group, method] = serialization.serialize_response + response_deserializers[group, method] = serialization.deserialize_response + return _SerializationBehaviors( + request_serializers, request_deserializers, response_serializers, + response_deserializers) + + +class _Implementation(test_interfaces.Implementation): + + def instantiate(self, serializations, servicer): + serialization_behaviors = _serialization_behaviors_from_serializations( + serializations) + invocation_end_link = implementations.invocation_end_link() + service_end_link = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + service_grpc_link = service.service_link( + serialization_behaviors.request_deserializers, + serialization_behaviors.response_serializers) + port = service_grpc_link.add_port(0, None) + channel = _intermediary_low.Channel('localhost:%d' % port, None) + invocation_grpc_link = invocation.invocation_link( + channel, b'localhost', + serialization_behaviors.request_serializers, + serialization_behaviors.response_deserializers) + + invocation_end_link.join_link(invocation_grpc_link) + invocation_grpc_link.join_link(invocation_end_link) + service_end_link.join_link(service_grpc_link) + service_grpc_link.join_link(service_end_link) + invocation_grpc_link.start() + service_grpc_link.start() + return invocation_end_link, service_end_link, ( + invocation_grpc_link, service_grpc_link) + + def destantiate(self, memo): + invocation_grpc_link, service_grpc_link = memo + invocation_grpc_link.stop() + service_grpc_link.stop_gracefully() + + def invocation_initial_metadata(self): + return _INVOCATION_INITIAL_METADATA + + def service_initial_metadata(self): + return _SERVICE_INITIAL_METADATA + + def invocation_completion(self): + return utilities.completion(None, None, None) + + def service_completion(self): + return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is None or grpc_test_common.metadata_transmitted( + original_metadata, transmitted_metadata) + + def completion_transmitted(self, original_completion, transmitted_completion): + if (original_completion.terminal_metadata is not None and + not grpc_test_common.metadata_transmitted( + original_completion.terminal_metadata, + transmitted_completion.terminal_metadata)): + return False + elif original_completion.code is not transmitted_completion.code: + return False + elif original_completion.message != transmitted_completion.message: + return False + else: + return True + + +def setUpModule(): + logging.warn('setUpModule!') + + +def tearDownModule(): + logging.warn('tearDownModule!') + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py new file mode 100644 index 0000000000..8d72f131d5 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py @@ -0,0 +1,96 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import logging +import random +import time +import unittest + +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + + +class _Implementation(test_interfaces.Implementation): + + def __init__(self): + self._invocation_initial_metadata = object() + self._service_initial_metadata = object() + self._invocation_terminal_metadata = object() + self._service_terminal_metadata = object() + + def instantiate(self, serializations, servicer): + invocation = implementations.invocation_end_link() + service = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + invocation.join_link(service) + service.join_link(invocation) + return invocation, service, None + + def destantiate(self, memo): + pass + + def invocation_initial_metadata(self): + return self._invocation_initial_metadata + + def service_initial_metadata(self): + return self._service_initial_metadata + + def invocation_completion(self): + return utilities.completion(self._invocation_terminal_metadata, None, None) + + def service_completion(self): + return utilities.completion(self._service_terminal_metadata, None, None) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return transmitted_metadata is original_metadata + + def completion_transmitted(self, original_completion, transmitted_completion): + return ( + (original_completion.terminal_metadata is + transmitted_completion.terminal_metadata) and + original_completion.code is transmitted_completion.code and + original_completion.message is transmitted_completion.message + ) + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py index dd332fe5dd..5c8b176da4 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -211,8 +211,10 @@ class _OperationTest(unittest.TestCase): elif instruction.kind is _control.Instruction.Kind.CONCLUDE: break - invocation_end.stop_gracefully() - service_end.stop_gracefully() + invocation_stop_event = invocation_end.stop(0) + service_stop_event = service_end.stop(0) + invocation_stop_event.wait() + service_stop_event.wait() invocation_stats = invocation_end.operation_stats() service_stats = service_end.operation_stats() diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py index 6c2e3346aa..a2bd7107c1 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py @@ -29,9 +29,42 @@ """State and behavior appropriate for use in tests.""" +import logging import threading +import time from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +# A more-or-less arbitrary limit on the length of raw data values to be logged. +_UNCOMFORTABLY_LONG = 48 + + +def _safe_for_log_ticket(ticket): + """Creates a safe-for-printing-to-the-log ticket for a given ticket. + + Args: + ticket: Any links.Ticket. + + Returns: + A links.Ticket that is as much as can be equal to the given ticket but + possibly features values like the string "<payload of length 972321>" in + place of the actual values of the given ticket. + """ + if isinstance(ticket.payload, (basestring,)): + payload_length = len(ticket.payload) + else: + payload_length = -1 + if payload_length < _UNCOMFORTABLY_LONG: + return ticket + else: + return links.Ticket( + ticket.operation_id, ticket.sequence_number, + ticket.group, ticket.method, ticket.subscription, ticket.timeout, + ticket.allowance, ticket.initial_metadata, + '<payload of length {}>'.format(payload_length), + ticket.terminal_metadata, ticket.code, ticket.message, + ticket.termination) class RecordingLink(links.Link): @@ -64,3 +97,71 @@ class RecordingLink(links.Link): """Returns a copy of the list of all tickets received by this Link.""" with self._condition: return tuple(self._tickets) + + +class _Pipe(object): + """A conduit that logs all tickets passed through it.""" + + def __init__(self, name): + self._lock = threading.Lock() + self._name = name + self._left_mate = utilities.NULL_LINK + self._right_mate = utilities.NULL_LINK + + def accept_left_to_right_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving left to right through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._right_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def accept_right_to_left_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving right to left through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._left_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def join_left_mate(self, left_mate): + with self._lock: + self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate + + def join_right_mate(self, right_mate): + with self._lock: + self._right_mate = ( + utilities.NULL_LINK if right_mate is None else right_mate) + + +class _Facade(links.Link): + + def __init__(self, accept, join): + self._accept = accept + self._join = join + + def accept_ticket(self, ticket): + self._accept(ticket) + + def join_link(self, link): + self._join(link) + + +def logging_links(name): + """Creates a conduit that logs all tickets passed through it. + + Args: + name: A name to use for the conduit to identify itself in logging output. + + Returns: + Two links.Links, the first of which is the "left" side of the conduit + and the second of which is the "right" side of the conduit. + """ + pipe = _Pipe(name) + left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate) + right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate) + return left_facade, right_facade |