aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/commands.py24
-rw-r--r--src/python/grpcio/grpc/__init__.py39
-rw-r--r--src/python/grpcio/grpc/_adapter/.gitignore5
-rw-r--r--src/python/grpcio/grpc/_adapter/__init__.py30
-rw-r--r--src/python/grpcio/grpc/_adapter/_common.py76
-rw-r--r--src/python/grpcio/grpc/_adapter/_intermediary_low.py258
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py229
-rw-r--r--src/python/grpcio/grpc/_adapter/_types.py446
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi3
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi3
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi16
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi9
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi9
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi3
-rw-r--r--src/python/grpcio/grpc/_links/__init__.py30
-rw-r--r--src/python/grpcio/grpc/_links/_constants.py42
-rw-r--r--src/python/grpcio/grpc/_links/invocation.py453
-rw-r--r--src/python/grpcio/grpc/_links/service.py509
-rw-r--r--src/python/grpcio/grpc/beta/_server.py209
-rw-r--r--src/python/grpcio/grpc/beta/_stub.py155
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py1
-rw-r--r--src/python/grpcio/grpc/framework/core/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/core/_constants.py60
-rw-r--r--src/python/grpcio/grpc/framework/core/_context.py94
-rw-r--r--src/python/grpcio/grpc/framework/core/_emission.py100
-rw-r--r--src/python/grpcio/grpc/framework/core/_end.py244
-rw-r--r--src/python/grpcio/grpc/framework/core/_expiration.py154
-rw-r--r--src/python/grpcio/grpc/framework/core/_ingestion.py439
-rw-r--r--src/python/grpcio/grpc/framework/core/_interfaces.py331
-rw-r--r--src/python/grpcio/grpc/framework/core/_operation.py204
-rw-r--r--src/python/grpcio/grpc/framework/core/_protocol.py176
-rw-r--r--src/python/grpcio/grpc/framework/core/_reception.py159
-rw-r--r--src/python/grpcio/grpc/framework/core/_termination.py229
-rw-r--r--src/python/grpcio/grpc/framework/core/_transmission.py335
-rw-r--r--src/python/grpcio/grpc/framework/core/_utilities.py54
-rw-r--r--src/python/grpcio/grpc/framework/core/implementations.py62
-rw-r--r--src/python/grpcio/grpc/framework/crust/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/crust/_calls.py223
-rw-r--r--src/python/grpcio/grpc/framework/crust/_control.py584
-rw-r--r--src/python/grpcio/grpc/framework/crust/_service.py173
-rw-r--r--src/python/grpcio/grpc/framework/crust/implementations.py366
-rw-r--r--src/python/grpcio/grpc/framework/foundation/_timer_future.py228
-rw-r--r--src/python/grpcio/grpc/framework/foundation/activated.py65
-rw-r--r--src/python/grpcio/grpc/framework/foundation/later.py51
-rw-r--r--src/python/grpcio/grpc/framework/foundation/relay.py174
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/links/__init__.py30
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/links/links.py143
-rw-r--r--src/python/grpcio/grpc/framework/interfaces/links/utilities.py44
-rw-r--r--src/python/grpcio/grpc_version.py2
49 files changed, 58 insertions, 7275 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 3f91954d5f..86a73fa836 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -58,10 +58,31 @@ CONF_PY_ADDENDUM = """
extensions.append('sphinx.ext.napoleon')
napoleon_google_docstring = True
napoleon_numpy_docstring = True
+napoleon_include_special_with_doc = True
html_theme = 'sphinx_rtd_theme'
"""
+API_GLOSSARY = """
+
+Glossary
+================
+
+.. glossary::
+
+ metadatum
+ A key-value pair included in the HTTP header. It is a
+ 2-tuple where the first entry is the key and the
+ second is the value, i.e. (key, value). The metadata key is an ASCII str,
+ and must be a valid HTTP header name. The metadata value can be
+ either a valid HTTP ASCII str, or bytes. If bytes are provided,
+ the key must end with '-bin', i.e.
+ ``('binary-metadata-bin', b'\\x00\\xFF')``
+
+ metadata
+ A sequence of metadatum.
+"""
+
class CommandError(Exception):
"""Simple exception class for GRPC custom commands."""
@@ -131,6 +152,9 @@ class SphinxDocumentation(setuptools.Command):
conf_filepath = os.path.join('doc', 'src', 'conf.py')
with open(conf_filepath, 'a') as conf_file:
conf_file.write(CONF_PY_ADDENDUM)
+ glossary_filepath = os.path.join('doc', 'src', 'grpc.rst')
+ with open(glossary_filepath, 'a') as glossary_filepath:
+ glossary_filepath.write(API_GLOSSARY)
sphinx.main(['', os.path.join('doc', 'src'), os.path.join('doc', 'build')])
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index afd7db1f75..fd015129f0 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -312,7 +312,7 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
This method blocks until the value is available.
Returns:
- The initial metadata as a sequence of pairs of bytes.
+ The initial :term:`metadata`.
"""
raise NotImplementedError()
@@ -323,7 +323,7 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
This method blocks until the value is available.
Returns:
- The trailing metadata as a sequence of pairs of bytes.
+ The trailing :term:`metadata`.
"""
raise NotImplementedError()
@@ -394,8 +394,7 @@ class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
"""Inform the gRPC runtime of the metadata to construct a CallCredentials.
Args:
- metadata: An iterable of 2-sequences (e.g. tuples) of metadata key/value
- pairs.
+ metadata: The :term:`metadata` used to construct the CallCredentials.
error: An Exception to indicate error or None to indicate success.
"""
raise NotImplementedError()
@@ -442,7 +441,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -463,7 +462,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request: The request value for the RPC.
timeout: An optional durating of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -484,7 +483,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -507,7 +506,7 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request: The request value for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: An optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -530,7 +529,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -553,7 +552,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -575,7 +574,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -599,7 +598,7 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
- metadata: An optional sequence of pairs of bytes to be transmitted to the
+ metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
@@ -707,7 +706,7 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
"""Accesses the metadata from the invocation-side of the RPC.
Returns:
- The invocation metadata object as a sequence of pairs of bytes.
+ The invocation :term:`metadata`.
"""
raise NotImplementedError()
@@ -728,8 +727,7 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
service-side initial metadata to transmit.
Args:
- initial_metadata: The initial metadata of the RPC as a sequence of pairs
- of bytes.
+ initial_metadata: The initial :term:`metadata`.
"""
raise NotImplementedError()
@@ -741,8 +739,7 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
service-side trailing metadata to transmit.
Args:
- trailing_metadata: The trailing metadata of the RPC as a sequence of pairs
- of bytes.
+ trailing_metadata: The trailing :term:`metadata`.
"""
raise NotImplementedError()
@@ -815,7 +812,7 @@ class HandlerCallDetails(six.with_metaclass(abc.ABCMeta)):
"""Describes an RPC that has just arrived for service.
Attributes:
method: The method name of the RPC.
- invocation_metadata: The metadata from the invocation side of the RPC.
+ invocation_metadata: The :term:`metadata` from the invocation side of the RPC.
"""
@@ -1219,9 +1216,9 @@ def server(thread_pool, handlers=None):
to service RPCs.
handlers: An optional sequence of GenericRpcHandlers to be used to service
RPCs after the returned Server is started. These handlers need not be the
- only handlers the returned Server will use to service RPCs; other
- handlers may later be added to the returned Server by calling its
- add_generic_rpc_handlers method any time before it is started.
+ only handlers the server will use to service RPCs; other handlers may
+ later be added by calling add_generic_rpc_handlers any time before the
+ returned Server is started.
Returns:
A Server with which RPCs can be serviced.
diff --git a/src/python/grpcio/grpc/_adapter/.gitignore b/src/python/grpcio/grpc/_adapter/.gitignore
deleted file mode 100644
index a6f96cd6db..0000000000
--- a/src/python/grpcio/grpc/_adapter/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-*.a
-*.so
-*.dll
-*.pyc
-*.pyd
diff --git a/src/python/grpcio/grpc/_adapter/__init__.py b/src/python/grpcio/grpc/_adapter/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/grpc/_adapter/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/grpc/_adapter/_common.py b/src/python/grpcio/grpc/_adapter/_common.py
deleted file mode 100644
index 492849f4cb..0000000000
--- a/src/python/grpcio/grpc/_adapter/_common.py
+++ /dev/null
@@ -1,76 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State used by both invocation-side and service-side code."""
-
-import enum
-
-
-@enum.unique
-class HighWrite(enum.Enum):
- """The possible categories of high-level write state."""
-
- OPEN = 'OPEN'
- CLOSED = 'CLOSED'
-
-
-class WriteState(object):
- """A description of the state of writing to an RPC.
-
- Attributes:
- low: A side-specific value describing the low-level state of writing.
- high: A HighWrite value describing the high-level state of writing.
- pending: A list of bytestrings for the RPC waiting to be written to the
- other side of the RPC.
- """
-
- def __init__(self, low, high, pending):
- self.low = low
- self.high = high
- self.pending = pending
-
-
-class CommonRPCState(object):
- """A description of an RPC's state.
-
- Attributes:
- write: A WriteState describing the state of writing to the RPC.
- sequence_number: The lowest-unused sequence number for use in generating
- tickets locally describing the progress of the RPC.
- deserializer: The behavior to be used to deserialize payload bytestreams
- taken off the wire.
- serializer: The behavior to be used to serialize payloads to be sent on the
- wire.
- """
-
- def __init__(self, write, sequence_number, deserializer, serializer):
- self.write = write
- self.sequence_number = sequence_number
- self.deserializer = deserializer
- self.serializer = serializer
diff --git a/src/python/grpcio/grpc/_adapter/_intermediary_low.py b/src/python/grpcio/grpc/_adapter/_intermediary_low.py
deleted file mode 100644
index 9698ffeabf..0000000000
--- a/src/python/grpcio/grpc/_adapter/_intermediary_low.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Temporary old _low-like layer.
-
-Eases refactoring burden while we overhaul the Python framework.
-
-Plan:
- The layers used to look like:
- ... # outside _adapter
- fore.py + rear.py # visible outside _adapter
- _low
- _c
- The layers currently look like:
- ... # outside _adapter
- fore.py + rear.py # visible outside _adapter
- _low_intermediary # adapter for new '_low' to old '_low'
- _low # new '_low'
- _c # new '_c'
- We will later remove _low_intermediary after refactoring of fore.py and
- rear.py according to the ticket system refactoring and get:
- ... # outside _adapter, refactored
- fore.py + rear.py # visible outside _adapter, refactored
- _low # new '_low'
- _c # new '_c'
-"""
-
-import collections
-import enum
-
-from grpc._adapter import _low
-from grpc._adapter import _types
-
-_IGNORE_ME_TAG = object()
-Code = _types.StatusCode
-WriteFlags = _types.OpWriteFlags
-
-
-class Status(collections.namedtuple('Status', ['code', 'details'])):
- """Describes an RPC's overall status."""
-
-
-class ServiceAcceptance(
- collections.namedtuple(
- 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
- """Describes an RPC on the service side at the start of service."""
-
-
-class Event(
- collections.namedtuple(
- 'Event',
- ['kind', 'tag', 'write_accepted', 'complete_accepted',
- 'service_acceptance', 'bytes', 'status', 'metadata'])):
- """Describes an event emitted from a completion queue."""
-
- @enum.unique
- class Kind(enum.Enum):
- """Describes the kind of an event."""
-
- STOP = object()
- WRITE_ACCEPTED = object()
- COMPLETE_ACCEPTED = object()
- SERVICE_ACCEPTED = object()
- READ_ACCEPTED = object()
- METADATA_ACCEPTED = object()
- FINISH = object()
-
-
-class _TagAdapter(collections.namedtuple('_TagAdapter', [
- 'user_tag',
- 'kind'
- ])):
- pass
-
-
-class Call(object):
- """Adapter from old _low.Call interface to new _low.Call."""
-
- def __init__(self, channel, completion_queue, method, host, deadline):
- self._internal = channel._internal.create_call(
- completion_queue._internal, method, host, deadline)
- self._metadata = []
-
- @staticmethod
- def _from_internal(internal):
- call = Call.__new__(Call)
- call._internal = internal
- call._metadata = []
- return call
-
- def invoke(self, completion_queue, metadata_tag, finish_tag):
- err = self._internal.start_batch([
- _types.OpArgs.send_initial_metadata(self._metadata)
- ], _IGNORE_ME_TAG)
- if err != _types.CallError.OK:
- return err
- err = self._internal.start_batch([
- _types.OpArgs.recv_initial_metadata()
- ], _TagAdapter(metadata_tag, Event.Kind.METADATA_ACCEPTED))
- if err != _types.CallError.OK:
- return err
- err = self._internal.start_batch([
- _types.OpArgs.recv_status_on_client()
- ], _TagAdapter(finish_tag, Event.Kind.FINISH))
- return err
-
- def write(self, message, tag, flags):
- return self._internal.start_batch([
- _types.OpArgs.send_message(message, flags)
- ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
-
- def complete(self, tag):
- return self._internal.start_batch([
- _types.OpArgs.send_close_from_client()
- ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
-
- def accept(self, completion_queue, tag):
- return self._internal.start_batch([
- _types.OpArgs.recv_close_on_server()
- ], _TagAdapter(tag, Event.Kind.FINISH))
-
- def add_metadata(self, key, value):
- self._metadata.append((key, value))
-
- def premetadata(self):
- result = self._internal.start_batch([
- _types.OpArgs.send_initial_metadata(self._metadata)
- ], _IGNORE_ME_TAG)
- self._metadata = []
- return result
-
- def read(self, tag):
- return self._internal.start_batch([
- _types.OpArgs.recv_message()
- ], _TagAdapter(tag, Event.Kind.READ_ACCEPTED))
-
- def status(self, status, tag):
- return self._internal.start_batch([
- _types.OpArgs.send_status_from_server(
- self._metadata, status.code, status.details)
- ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
-
- def cancel(self):
- return self._internal.cancel()
-
- def peer(self):
- return self._internal.peer()
-
- def set_credentials(self, creds):
- return self._internal.set_credentials(creds)
-
-
-class Channel(object):
- """Adapter from old _low.Channel interface to new _low.Channel."""
-
- def __init__(self, hostport, channel_credentials, server_host_override=None):
- args = []
- if server_host_override:
- args.append((_types.GrpcChannelArgumentKeys.SSL_TARGET_NAME_OVERRIDE.value, server_host_override))
- self._internal = _low.Channel(hostport, args, channel_credentials)
-
-
-class CompletionQueue(object):
- """Adapter from old _low.CompletionQueue interface to new _low.CompletionQueue."""
-
- def __init__(self):
- self._internal = _low.CompletionQueue()
-
- def get(self, deadline=None):
- if deadline is None:
- ev = self._internal.next(float('+inf'))
- else:
- ev = self._internal.next(deadline)
- if ev is None:
- return None
- elif ev.tag is _IGNORE_ME_TAG:
- return self.get(deadline)
- elif ev.type == _types.EventType.QUEUE_SHUTDOWN:
- kind = Event.Kind.STOP
- tag = None
- write_accepted = None
- complete_accepted = None
- service_acceptance = None
- message_bytes = None
- status = None
- metadata = None
- elif ev.type == _types.EventType.OP_COMPLETE:
- kind = ev.tag.kind
- tag = ev.tag.user_tag
- write_accepted = ev.success if kind == Event.Kind.WRITE_ACCEPTED else None
- complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
- service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
- message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
- status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None
- metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
- else:
- raise RuntimeError('unknown event')
- result_ev = Event(kind=kind, tag=tag, write_accepted=write_accepted, complete_accepted=complete_accepted, service_acceptance=service_acceptance, bytes=message_bytes, status=status, metadata=metadata)
- return result_ev
-
- def stop(self):
- self._internal.shutdown()
-
-
-class Server(object):
- """Adapter from old _low.Server interface to new _low.Server."""
-
- def __init__(self, completion_queue):
- self._internal = _low.Server(completion_queue._internal, [])
- self._internal_cq = completion_queue._internal
-
- def add_http2_addr(self, addr):
- return self._internal.add_http2_port(addr)
-
- def add_secure_http2_addr(self, addr, server_credentials):
- if server_credentials is None:
- return self._internal.add_http2_port(addr, None)
- else:
- return self._internal.add_http2_port(addr, server_credentials)
-
- def start(self):
- return self._internal.start()
-
- def service(self, tag):
- return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
-
- def cancel_all_calls(self):
- self._internal.cancel_all_calls()
-
- def stop(self):
- return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP))
-
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
deleted file mode 100644
index 48410167a0..0000000000
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import threading
-
-from grpc import _grpcio_metadata
-from grpc import _plugin_wrapping
-from grpc._cython import cygrpc
-from grpc._adapter import _types
-
-_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
-
-ChannelCredentials = cygrpc.ChannelCredentials
-CallCredentials = cygrpc.CallCredentials
-ServerCredentials = cygrpc.ServerCredentials
-
-channel_credentials_composite = cygrpc.channel_credentials_composite
-call_credentials_composite = cygrpc.call_credentials_composite
-
-def server_credentials_ssl(root_credentials, pair_sequence, force_client_auth):
- return cygrpc.server_credentials_ssl(
- root_credentials,
- [cygrpc.SslPemKeyCertPair(key, pem) for key, pem in pair_sequence],
- force_client_auth)
-
-def channel_credentials_ssl(
- root_certificates, private_key, certificate_chain):
- pair = None
- if private_key is not None or certificate_chain is not None:
- pair = cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
- return cygrpc.channel_credentials_ssl(root_certificates, pair)
-
-
-call_credentials_metadata_plugin = (
- _plugin_wrapping.call_credentials_metadata_plugin)
-
-
-class CompletionQueue(_types.CompletionQueue):
-
- def __init__(self):
- self.completion_queue = cygrpc.CompletionQueue()
-
- def next(self, deadline=float('+inf')):
- raw_event = self.completion_queue.poll(cygrpc.Timespec(deadline))
- if raw_event.type == cygrpc.CompletionType.queue_timeout:
- return None
- event_type = raw_event.type
- event_tag = raw_event.tag
- event_call = Call(raw_event.operation_call)
- if raw_event.request_call_details:
- event_call_details = _types.CallDetails(
- raw_event.request_call_details.method,
- raw_event.request_call_details.host,
- float(raw_event.request_call_details.deadline))
- else:
- event_call_details = None
- event_success = raw_event.success
- event_results = []
- if raw_event.is_new_request:
- event_results.append(_types.OpResult(
- _types.OpType.RECV_INITIAL_METADATA, raw_event.request_metadata,
- None, None, None, None))
- else:
- if raw_event.batch_operations:
- for operation in raw_event.batch_operations:
- result_type = operation.type
- result_initial_metadata = operation.received_metadata_or_none
- result_trailing_metadata = operation.received_metadata_or_none
- result_message = operation.received_message_or_none
- if result_message is not None:
- result_message = result_message.bytes()
- result_cancelled = operation.received_cancelled_or_none
- if operation.has_status:
- result_status = _types.Status(
- operation.received_status_code_or_none,
- operation.received_status_details_or_none)
- else:
- result_status = None
- event_results.append(
- _types.OpResult(result_type, result_initial_metadata,
- result_trailing_metadata, result_message,
- result_status, result_cancelled))
- return _types.Event(event_type, event_tag, event_call, event_call_details,
- event_results, event_success)
-
- def shutdown(self):
- self.completion_queue.shutdown()
-
-
-class Call(_types.Call):
-
- def __init__(self, call):
- self.call = call
-
- def start_batch(self, ops, tag):
- translated_ops = []
- for op in ops:
- if op.type == _types.OpType.SEND_INITIAL_METADATA:
- translated_op = cygrpc.operation_send_initial_metadata(
- cygrpc.Metadata(
- cygrpc.Metadatum(key, value)
- for key, value in op.initial_metadata),
- op.flags)
- elif op.type == _types.OpType.SEND_MESSAGE:
- translated_op = cygrpc.operation_send_message(op.message, op.flags)
- elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT:
- translated_op = cygrpc.operation_send_close_from_client(op.flags)
- elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER:
- translated_op = cygrpc.operation_send_status_from_server(
- cygrpc.Metadata(
- cygrpc.Metadatum(key, value)
- for key, value in op.trailing_metadata),
- op.status.code,
- op.status.details,
- op.flags)
- elif op.type == _types.OpType.RECV_INITIAL_METADATA:
- translated_op = cygrpc.operation_receive_initial_metadata(
- op.flags)
- elif op.type == _types.OpType.RECV_MESSAGE:
- translated_op = cygrpc.operation_receive_message(op.flags)
- elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT:
- translated_op = cygrpc.operation_receive_status_on_client(
- op.flags)
- elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER:
- translated_op = cygrpc.operation_receive_close_on_server(op.flags)
- else:
- raise ValueError('unexpected operation type {}'.format(op.type))
- translated_ops.append(translated_op)
- return self.call.start_batch(cygrpc.Operations(translated_ops), tag)
-
- def cancel(self, code=None, details=None):
- if code is None and details is None:
- return self.call.cancel()
- else:
- return self.call.cancel(code, details)
-
- def peer(self):
- return self.call.peer()
-
- def set_credentials(self, creds):
- return self.call.set_credentials(creds)
-
-
-class Channel(_types.Channel):
-
- def __init__(self, target, args, creds=None):
- args = list(args) + [
- (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT)]
- args = cygrpc.ChannelArgs(
- cygrpc.ChannelArg(key, value) for key, value in args)
- if creds is None:
- self.channel = cygrpc.Channel(target, args)
- else:
- self.channel = cygrpc.Channel(target, args, creds)
-
- def create_call(self, completion_queue, method, host, deadline=None):
- internal_call = self.channel.create_call(
- None, 0, completion_queue.completion_queue, method, host,
- cygrpc.Timespec(deadline))
- return Call(internal_call)
-
- def check_connectivity_state(self, try_to_connect):
- return self.channel.check_connectivity_state(try_to_connect)
-
- def watch_connectivity_state(self, last_observed_state, deadline,
- completion_queue, tag):
- self.channel.watch_connectivity_state(
- last_observed_state, cygrpc.Timespec(deadline),
- completion_queue.completion_queue, tag)
-
- def target(self):
- return self.channel.target()
-
-
-_NO_TAG = object()
-
-class Server(_types.Server):
-
- def __init__(self, completion_queue, args):
- args = cygrpc.ChannelArgs(
- cygrpc.ChannelArg(key, value) for key, value in args)
- self.server = cygrpc.Server(args)
- self.server.register_completion_queue(completion_queue.completion_queue)
- self.server_queue = completion_queue
-
- def add_http2_port(self, addr, creds=None):
- if creds is None:
- return self.server.add_http2_port(addr)
- else:
- return self.server.add_http2_port(addr, creds)
-
- def start(self):
- return self.server.start()
-
- def shutdown(self, tag=None):
- return self.server.shutdown(self.server_queue.completion_queue, tag)
-
- def request_call(self, completion_queue, tag):
- return self.server.request_call(completion_queue.completion_queue,
- self.server_queue.completion_queue, tag)
-
- def cancel_all_calls(self):
- return self.server.cancel_all_calls()
diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py
deleted file mode 100644
index b7cc6fbbb5..0000000000
--- a/src/python/grpcio/grpc/_adapter/_types.py
+++ /dev/null
@@ -1,446 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import abc
-import collections
-import enum
-
-import six
-
-from grpc._cython import cygrpc
-
-
-class GrpcChannelArgumentKeys(enum.Enum):
- """Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
- SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'
-
-
-@enum.unique
-class CallError(enum.IntEnum):
- """Mirrors grpc_call_error in the C core."""
- OK = cygrpc.CallError.ok
- ERROR = cygrpc.CallError.error
- ERROR_NOT_ON_SERVER = cygrpc.CallError.not_on_server
- ERROR_NOT_ON_CLIENT = cygrpc.CallError.not_on_client
- ERROR_ALREADY_ACCEPTED = cygrpc.CallError.already_accepted
- ERROR_ALREADY_INVOKED = cygrpc.CallError.already_invoked
- ERROR_NOT_INVOKED = cygrpc.CallError.not_invoked
- ERROR_ALREADY_FINISHED = cygrpc.CallError.already_finished
- ERROR_TOO_MANY_OPERATIONS = cygrpc.CallError.too_many_operations
- ERROR_INVALID_FLAGS = cygrpc.CallError.invalid_flags
- ERROR_INVALID_METADATA = cygrpc.CallError.invalid_metadata
-
-
-@enum.unique
-class StatusCode(enum.IntEnum):
- """Mirrors grpc_status_code in the C core."""
- OK = cygrpc.StatusCode.ok
- CANCELLED = cygrpc.StatusCode.cancelled
- UNKNOWN = cygrpc.StatusCode.unknown
- INVALID_ARGUMENT = cygrpc.StatusCode.invalid_argument
- DEADLINE_EXCEEDED = cygrpc.StatusCode.deadline_exceeded
- NOT_FOUND = cygrpc.StatusCode.not_found
- ALREADY_EXISTS = cygrpc.StatusCode.already_exists
- PERMISSION_DENIED = cygrpc.StatusCode.permission_denied
- RESOURCE_EXHAUSTED = cygrpc.StatusCode.resource_exhausted
- FAILED_PRECONDITION = cygrpc.StatusCode.failed_precondition
- ABORTED = cygrpc.StatusCode.aborted
- OUT_OF_RANGE = cygrpc.StatusCode.out_of_range
- UNIMPLEMENTED = cygrpc.StatusCode.unimplemented
- INTERNAL = cygrpc.StatusCode.internal
- UNAVAILABLE = cygrpc.StatusCode.unavailable
- DATA_LOSS = cygrpc.StatusCode.data_loss
- UNAUTHENTICATED = cygrpc.StatusCode.unauthenticated
-
-
-@enum.unique
-class OpWriteFlags(enum.IntEnum):
- """Mirrors defined write-flag constants in the C core."""
- WRITE_BUFFER_HINT = cygrpc.WriteFlag.buffer_hint
- WRITE_NO_COMPRESS = cygrpc.WriteFlag.no_compress
-
-
-@enum.unique
-class OpType(enum.IntEnum):
- """Mirrors grpc_op_type in the C core."""
- SEND_INITIAL_METADATA = cygrpc.OperationType.send_initial_metadata
- SEND_MESSAGE = cygrpc.OperationType.send_message
- SEND_CLOSE_FROM_CLIENT = cygrpc.OperationType.send_close_from_client
- SEND_STATUS_FROM_SERVER = cygrpc.OperationType.send_status_from_server
- RECV_INITIAL_METADATA = cygrpc.OperationType.receive_initial_metadata
- RECV_MESSAGE = cygrpc.OperationType.receive_message
- RECV_STATUS_ON_CLIENT = cygrpc.OperationType.receive_status_on_client
- RECV_CLOSE_ON_SERVER = cygrpc.OperationType.receive_close_on_server
-
-
-@enum.unique
-class EventType(enum.IntEnum):
- """Mirrors grpc_completion_type in the C core."""
- QUEUE_SHUTDOWN = cygrpc.CompletionType.queue_shutdown
- QUEUE_TIMEOUT = cygrpc.CompletionType.queue_timeout
- OP_COMPLETE = cygrpc.CompletionType.operation_complete
-
-
-@enum.unique
-class ConnectivityState(enum.IntEnum):
- """Mirrors grpc_connectivity_state in the C core."""
- IDLE = cygrpc.ConnectivityState.idle
- CONNECTING = cygrpc.ConnectivityState.connecting
- READY = cygrpc.ConnectivityState.ready
- TRANSIENT_FAILURE = cygrpc.ConnectivityState.transient_failure
- FATAL_FAILURE = cygrpc.ConnectivityState.shutdown
-
-
-class Status(collections.namedtuple(
- 'Status', [
- 'code',
- 'details',
- ])):
- """The end status of a GRPC call.
-
- Attributes:
- code (StatusCode): ...
- details (str): ...
- """
-
-
-class CallDetails(collections.namedtuple(
- 'CallDetails', [
- 'method',
- 'host',
- 'deadline',
- ])):
- """Provides information to the server about the client's call.
-
- Attributes:
- method (str): ...
- host (str): ...
- deadline (float): ...
- """
-
-
-class OpArgs(collections.namedtuple(
- 'OpArgs', [
- 'type',
- 'initial_metadata',
- 'trailing_metadata',
- 'message',
- 'status',
- 'flags',
- ])):
- """Arguments passed into a GRPC operation.
-
- Attributes:
- type (OpType): ...
- initial_metadata (sequence of 2-sequence of str): Only valid if type ==
- OpType.SEND_INITIAL_METADATA, else is None.
- trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
- OpType.SEND_STATUS_FROM_SERVER, else is None.
- message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
- status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
- is None.
- flags (int): a bitwise OR'ing of 0 or more OpWriteFlags values.
- """
-
- @staticmethod
- def send_initial_metadata(initial_metadata):
- return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None, 0)
-
- @staticmethod
- def send_message(message, flags):
- return OpArgs(OpType.SEND_MESSAGE, None, None, message, None, flags)
-
- @staticmethod
- def send_close_from_client():
- return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None, 0)
-
- @staticmethod
- def send_status_from_server(trailing_metadata, status_code, status_details):
- return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details), 0)
-
- @staticmethod
- def recv_initial_metadata():
- return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None, 0);
-
- @staticmethod
- def recv_message():
- return OpArgs(OpType.RECV_MESSAGE, None, None, None, None, 0)
-
- @staticmethod
- def recv_status_on_client():
- return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None, 0)
-
- @staticmethod
- def recv_close_on_server():
- return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None, 0)
-
-
-class OpResult(collections.namedtuple(
- 'OpResult', [
- 'type',
- 'initial_metadata',
- 'trailing_metadata',
- 'message',
- 'status',
- 'cancelled',
- ])):
- """Results received from a GRPC operation.
-
- Attributes:
- type (OpType): ...
- initial_metadata (sequence of 2-sequence of str): Only valid if type ==
- OpType.RECV_INITIAL_METADATA, else is None.
- trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
- OpType.RECV_STATUS_ON_CLIENT, else is None.
- message (bytes): Only valid if type == OpType.RECV_MESSAGE, else is None.
- status (Status): Only valid if type == OpType.RECV_STATUS_ON_CLIENT, else
- is None.
- cancelled (bool): Only valid if type == OpType.RECV_CLOSE_ON_SERVER, else
- is None.
- """
-
-
-class Event(collections.namedtuple(
- 'Event', [
- 'type',
- 'tag',
- 'call',
- 'call_details',
- 'results',
- 'success',
- ])):
- """An event received from a GRPC completion queue.
-
- Attributes:
- type (EventType): ...
- tag (object): ...
- call (Call): The Call object associated with this event (if there is one,
- else None).
- call_details (CallDetails): The call details associated with the
- server-side call (if there is such information, else None).
- results (list of OpResult): ...
- success (bool): ...
- """
-
-
-class CompletionQueue(six.with_metaclass(abc.ABCMeta)):
-
- @abc.abstractmethod
- def __init__(self):
- pass
-
- def __iter__(self):
- """This class may be iterated over.
-
- This is the equivalent of calling next() repeatedly with an absolute
- deadline of None (i.e. no deadline).
- """
- return self
-
- def __next__(self):
- return self.next()
-
- @abc.abstractmethod
- def next(self, deadline=float('+inf')):
- """Get the next event on this completion queue.
-
- Args:
- deadline (float): absolute deadline in seconds from the Python epoch, or
- None for no deadline.
-
- Returns:
- Event: ...
- """
- pass
-
- @abc.abstractmethod
- def shutdown(self):
- """Begin the shutdown process of this completion queue.
-
- Note that this does not immediately destroy the completion queue.
- Nevertheless, user code should not pass it around after invoking this.
- """
- return None
-
-
-class Call(six.with_metaclass(abc.ABCMeta)):
-
- @abc.abstractmethod
- def start_batch(self, ops, tag):
- """Start a batch of operations.
-
- Args:
- ops (sequence of OpArgs): ...
- tag (object): ...
-
- Returns:
- CallError: ...
- """
- return CallError.ERROR
-
- @abc.abstractmethod
- def cancel(self, code=None, details=None):
- """Cancel the call.
-
- Args:
- code (int): Status code to cancel with (on the server side). If
- specified, so must `details`.
- details (str): Status details to cancel with (on the server side). If
- specified, so must `code`.
-
- Returns:
- CallError: ...
- """
- return CallError.ERROR
-
- @abc.abstractmethod
- def peer(self):
- """Get the peer of this call.
-
- Returns:
- str: the peer of this call.
- """
- return None
-
- def set_credentials(self, creds):
- """Set per-call credentials.
-
- Args:
- creds (CallCredentials): Credentials to be set for this call.
- """
- return None
-
-
-class Channel(six.with_metaclass(abc.ABCMeta)):
-
- @abc.abstractmethod
- def __init__(self, target, args, credentials=None):
- """Initialize a Channel.
-
- Args:
- target (str): ...
- args (sequence of 2-sequence of str, (str|integer)): ...
- credentials (ChannelCredentials): If None, create an insecure channel,
- else create a secure channel using the client credentials.
- """
-
- @abc.abstractmethod
- def create_call(self, completion_queue, method, host, deadline=float('+inf')):
- """Create a call from this channel.
-
- Args:
- completion_queue (CompletionQueue): ...
- method (str): ...
- host (str): ...
- deadline (float): absolute deadline in seconds from the Python epoch, or
- None for no deadline.
-
- Returns:
- Call: call object associated with this Channel and passed parameters.
- """
- return None
-
- @abc.abstractmethod
- def check_connectivity_state(self, try_to_connect):
- """Check and optionally repair the connectivity state of the channel.
-
- Args:
- try_to_connect (bool): whether or not to try to connect the channel if
- disconnected.
-
- Returns:
- ConnectivityState: state of the channel at the time of this invocation.
- """
- return None
-
- @abc.abstractmethod
- def watch_connectivity_state(self, last_observed_state, deadline,
- completion_queue, tag):
- """Watch for connectivity state changes from the last_observed_state.
-
- Args:
- last_observed_state (ConnectivityState): ...
- deadline (float): ...
- completion_queue (CompletionQueue): ...
- tag (object) ...
- """
-
- @abc.abstractmethod
- def target(self):
- """Get the target of this channel.
-
- Returns:
- str: the target of this channel.
- """
- return None
-
-
-class Server(six.with_metaclass(abc.ABCMeta)):
-
- @abc.abstractmethod
- def __init__(self, completion_queue, args):
- """Initialize a server.
-
- Args:
- completion_queue (CompletionQueue): ...
- args (sequence of 2-sequence of str, (str|integer)): ...
- """
-
- @abc.abstractmethod
- def add_http2_port(self, address, credentials=None):
- """Adds an HTTP/2 address+port to the server.
-
- Args:
- address (str): ...
- credentials (ServerCredentials): If None, create an insecure port, else
- create a secure port using the server credentials.
- """
-
- @abc.abstractmethod
- def start(self):
- """Starts the server."""
-
- @abc.abstractmethod
- def shutdown(self, tag=None):
- """Shuts down the server. Does not immediately destroy the server.
-
- Args:
- tag (object): if not None, have the server place an event on its
- completion queue notifying it when this server has completely shut down.
- """
-
- @abc.abstractmethod
- def request_call(self, completion_queue, tag):
- """Requests a call from the server on the server's completion queue.
-
- Args:
- completion_queue (CompletionQueue): Completion queue for the call. May be
- the same as the server's completion queue.
- tag (object) ...
- """
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6570dcdb85..ba60986143 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -105,8 +105,7 @@ cdef class Call:
def __dealloc__(self):
if self.c_call != NULL:
- with nogil:
- grpc_call_destroy(self.c_call)
+ grpc_call_destroy(self.c_call)
# The object *should* always be valid from Python. Used for debugging.
@property
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 1406696510..5416401431 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -102,5 +102,4 @@ cdef class Channel:
def __dealloc__(self):
if self.c_channel != NULL:
- with nogil:
- grpc_channel_destroy(self.c_channel)
+ grpc_channel_destroy(self.c_channel)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 90266516fe..5955021ceb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -118,18 +118,14 @@ cdef class CompletionQueue:
def __dealloc__(self):
cdef gpr_timespec c_deadline
- with nogil:
- c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if self.c_completion_queue != NULL:
# Ensure shutdown
if not self.is_shutting_down:
- with nogil:
- grpc_completion_queue_shutdown(self.c_completion_queue)
- # Pump the queue
+ grpc_completion_queue_shutdown(self.c_completion_queue)
+ # Pump the queue (All outstanding calls should have been cancelled)
while not self.is_shutdown:
- with nogil:
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_deadline, NULL)
+ event = grpc_completion_queue_next(
+ self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
- with nogil:
- grpc_completion_queue_destroy(self.c_completion_queue)
+ grpc_completion_queue_destroy(self.c_completion_queue)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index b24e69243e..035ac49a8b 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -46,8 +46,7 @@ cdef class ChannelCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- with nogil:
- grpc_channel_credentials_release(self.c_credentials)
+ grpc_channel_credentials_release(self.c_credentials)
cdef class CallCredentials:
@@ -64,8 +63,7 @@ cdef class CallCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- with nogil:
- grpc_call_credentials_release(self.c_credentials)
+ grpc_call_credentials_release(self.c_credentials)
cdef class ServerCredentials:
@@ -76,8 +74,7 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- with nogil:
- grpc_server_credentials_release(self.c_credentials)
+ grpc_server_credentials_release(self.c_credentials)
cdef class CredentialsMetadataPlugin:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index b39b2f08de..54b3d00dfc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -287,8 +287,7 @@ cdef class ByteBuffer:
def __dealloc__(self):
if self.c_byte_buffer != NULL:
- with nogil:
- grpc_byte_buffer_destroy(self.c_byte_buffer)
+ grpc_byte_buffer_destroy(self.c_byte_buffer)
cdef class SslPemKeyCertPair:
@@ -420,8 +419,7 @@ cdef class Metadata:
# this frees the allocated memory for the grpc_metadata_array (although
# it'd be nice if that were documented somewhere...)
# TODO(atash): document this in the C core
- with nogil:
- grpc_metadata_array_destroy(&self.c_metadata_array)
+ grpc_metadata_array_destroy(&self.c_metadata_array)
def __len__(self):
return self.c_metadata_array.count
@@ -530,8 +528,7 @@ cdef class Operation:
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
- with nogil:
- gpr_free(self._received_status_details)
+ gpr_free(self._received_status_details)
def operation_send_initial_metadata(Metadata metadata, int flags):
cdef Operation op = Operation()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index 3e03b6efe1..4f2d51b03f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -171,5 +171,4 @@ cdef class Server:
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
- with nogil:
- grpc_server_destroy(self.c_server)
+ grpc_server_destroy(self.c_server)
diff --git a/src/python/grpcio/grpc/_links/__init__.py b/src/python/grpcio/grpc/_links/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/grpc/_links/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/grpc/_links/_constants.py b/src/python/grpcio/grpc/_links/_constants.py
deleted file mode 100644
index 117fc5a639..0000000000
--- a/src/python/grpcio/grpc/_links/_constants.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Constants for use within this package."""
-
-from grpc._adapter import _intermediary_low
-from grpc.beta import interfaces as beta_interfaces
-
-LOW_STATUS_CODE_TO_HIGH_STATUS_CODE = {
- low: high for low, high in zip(
- _intermediary_low.Code, beta_interfaces.StatusCode)
-}
-
-HIGH_STATUS_CODE_TO_LOW_STATUS_CODE = {
- high: low for low, high in LOW_STATUS_CODE_TO_HIGH_STATUS_CODE.items()
-}
diff --git a/src/python/grpcio/grpc/_links/invocation.py b/src/python/grpcio/grpc/_links/invocation.py
deleted file mode 100644
index 003653e1c8..0000000000
--- a/src/python/grpcio/grpc/_links/invocation.py
+++ /dev/null
@@ -1,453 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""The RPC-invocation-side bridge between RPC Framework and GRPC-on-the-wire."""
-
-import abc
-import enum
-import logging
-import threading
-import time
-
-import six
-
-from grpc._adapter import _intermediary_low
-from grpc._links import _constants
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.foundation import activated
-from grpc.framework.foundation import logging_pool
-from grpc.framework.foundation import relay
-from grpc.framework.interfaces.links import links
-
-_IDENTITY = lambda x: x
-
-_STOP = _intermediary_low.Event.Kind.STOP
-_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
-_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
-_READ = _intermediary_low.Event.Kind.READ_ACCEPTED
-_METADATA = _intermediary_low.Event.Kind.METADATA_ACCEPTED
-_FINISH = _intermediary_low.Event.Kind.FINISH
-
-
-@enum.unique
-class _Read(enum.Enum):
- AWAITING_METADATA = 'awaiting metadata'
- READING = 'reading'
- AWAITING_ALLOWANCE = 'awaiting allowance'
- CLOSED = 'closed'
-
-
-@enum.unique
-class _HighWrite(enum.Enum):
- OPEN = 'open'
- CLOSED = 'closed'
-
-
-@enum.unique
-class _LowWrite(enum.Enum):
- OPEN = 'OPEN'
- ACTIVE = 'ACTIVE'
- CLOSED = 'CLOSED'
-
-
-class _Context(beta_interfaces.GRPCInvocationContext):
-
- def __init__(self):
- self._lock = threading.Lock()
- self._disable_next_compression = False
-
- def disable_next_request_compression(self):
- with self._lock:
- self._disable_next_compression = True
-
- def next_compression_disabled(self):
- with self._lock:
- disabled = self._disable_next_compression
- self._disable_next_compression = False
- return disabled
-
-
-class _RPCState(object):
-
- def __init__(
- self, call, request_serializer, response_deserializer, sequence_number,
- read, allowance, high_write, low_write, due, context):
- self.call = call
- self.request_serializer = request_serializer
- self.response_deserializer = response_deserializer
- self.sequence_number = sequence_number
- self.read = read
- self.allowance = allowance
- self.high_write = high_write
- self.low_write = low_write
- self.due = due
- self.context = context
-
-
-def _no_longer_due(kind, rpc_state, key, rpc_states):
- rpc_state.due.remove(kind)
- if not rpc_state.due:
- del rpc_states[key]
-
-
-class _Kernel(object):
-
- def __init__(
- self, channel, host, metadata_transformer, request_serializers,
- response_deserializers, ticket_relay):
- self._lock = threading.Lock()
- self._channel = channel
- self._host = host
- self._metadata_transformer = metadata_transformer
- self._request_serializers = request_serializers
- self._response_deserializers = response_deserializers
- self._relay = ticket_relay
-
- self._completion_queue = None
- self._rpc_states = {}
- self._pool = None
-
- def _on_write_event(self, operation_id, unused_event, rpc_state):
- if rpc_state.high_write is _HighWrite.CLOSED:
- rpc_state.call.complete(operation_id)
- rpc_state.due.add(_COMPLETE)
- rpc_state.due.remove(_WRITE)
- rpc_state.low_write = _LowWrite.CLOSED
- else:
- ticket = links.Ticket(
- operation_id, rpc_state.sequence_number, None, None, None, None, 1,
- None, None, None, None, None, None, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
- rpc_state.low_write = _LowWrite.OPEN
- _no_longer_due(_WRITE, rpc_state, operation_id, self._rpc_states)
-
- def _on_read_event(self, operation_id, event, rpc_state):
- if event.bytes is None or _FINISH not in rpc_state.due:
- rpc_state.read = _Read.CLOSED
- _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
- else:
- if 0 < rpc_state.allowance:
- rpc_state.allowance -= 1
- rpc_state.call.read(operation_id)
- else:
- rpc_state.read = _Read.AWAITING_ALLOWANCE
- _no_longer_due(_READ, rpc_state, operation_id, self._rpc_states)
- ticket = links.Ticket(
- operation_id, rpc_state.sequence_number, None, None, None, None, None,
- None, rpc_state.response_deserializer(event.bytes), None, None, None,
- None, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
-
- def _on_metadata_event(self, operation_id, event, rpc_state):
- if _FINISH in rpc_state.due:
- rpc_state.allowance -= 1
- rpc_state.call.read(operation_id)
- rpc_state.read = _Read.READING
- rpc_state.due.add(_READ)
- rpc_state.due.remove(_METADATA)
- ticket = links.Ticket(
- operation_id, rpc_state.sequence_number, None, None,
- links.Ticket.Subscription.FULL, None, None, event.metadata, None,
- None, None, None, None, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
- else:
- _no_longer_due(_METADATA, rpc_state, operation_id, self._rpc_states)
-
- def _on_finish_event(self, operation_id, event, rpc_state):
- _no_longer_due(_FINISH, rpc_state, operation_id, self._rpc_states)
- if event.status.code == _intermediary_low.Code.OK:
- termination = links.Ticket.Termination.COMPLETION
- elif event.status.code == _intermediary_low.Code.CANCELLED:
- termination = links.Ticket.Termination.CANCELLATION
- elif event.status.code == _intermediary_low.Code.DEADLINE_EXCEEDED:
- termination = links.Ticket.Termination.EXPIRATION
- elif event.status.code == _intermediary_low.Code.UNIMPLEMENTED:
- termination = links.Ticket.Termination.REMOTE_FAILURE
- elif event.status.code == _intermediary_low.Code.UNKNOWN:
- termination = links.Ticket.Termination.LOCAL_FAILURE
- else:
- termination = links.Ticket.Termination.TRANSMISSION_FAILURE
- code = _constants.LOW_STATUS_CODE_TO_HIGH_STATUS_CODE[event.status.code]
- ticket = links.Ticket(
- operation_id, rpc_state.sequence_number, None, None, None, None, None,
- None, None, event.metadata, code, event.status.details, termination,
- None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
-
- def _spin(self, completion_queue):
- while True:
- event = completion_queue.get(None)
- with self._lock:
- rpc_state = self._rpc_states.get(event.tag, None)
- if event.kind is _STOP:
- pass
- elif event.kind is _WRITE:
- self._on_write_event(event.tag, event, rpc_state)
- elif event.kind is _METADATA:
- self._on_metadata_event(event.tag, event, rpc_state)
- elif event.kind is _READ:
- self._on_read_event(event.tag, event, rpc_state)
- elif event.kind is _FINISH:
- self._on_finish_event(event.tag, event, rpc_state)
- elif event.kind is _COMPLETE:
- _no_longer_due(_COMPLETE, rpc_state, event.tag, self._rpc_states)
- else:
- logging.error('Illegal RPC event! %s', (event,))
-
- if self._completion_queue is None and not self._rpc_states:
- completion_queue.stop()
- return
-
- def _invoke(
- self, operation_id, group, method, initial_metadata, payload, termination,
- timeout, allowance, options):
- """Invoke an RPC.
-
- Args:
- operation_id: Any object to be used as an operation ID for the RPC.
- group: The group to which the RPC method belongs.
- method: The RPC method name.
- initial_metadata: The initial metadata object for the RPC.
- payload: A payload object for the RPC or None if no payload was given at
- invocation-time.
- termination: A links.Ticket.Termination value or None indicated whether or
- not more writes will follow from this side of the RPC.
- timeout: A duration of time in seconds to allow for the RPC.
- allowance: The number of payloads (beyond the free first one) that the
- local ticket exchange mate has granted permission to be read.
- options: A beta_interfaces.GRPCCallOptions value or None.
- """
- if termination is links.Ticket.Termination.COMPLETION:
- high_write = _HighWrite.CLOSED
- elif termination is None:
- high_write = _HighWrite.OPEN
- else:
- return
-
- transformed_initial_metadata = self._metadata_transformer(initial_metadata)
- request_serializer = self._request_serializers.get(
- (group, method), _IDENTITY)
- response_deserializer = self._response_deserializers.get(
- (group, method), _IDENTITY)
-
- call = _intermediary_low.Call(
- self._channel, self._completion_queue, '/%s/%s' % (group, method),
- self._host, time.time() + timeout)
- if options is not None and options.credentials is not None:
- call.set_credentials(options.credentials._low_credentials)
- if transformed_initial_metadata is not None:
- for metadata_key, metadata_value in transformed_initial_metadata:
- call.add_metadata(metadata_key, metadata_value)
- call.invoke(self._completion_queue, operation_id, operation_id)
- if payload is None:
- if high_write is _HighWrite.CLOSED:
- call.complete(operation_id)
- low_write = _LowWrite.CLOSED
- due = set((_METADATA, _COMPLETE, _FINISH,))
- else:
- low_write = _LowWrite.OPEN
- due = set((_METADATA, _FINISH,))
- else:
- if options is not None and options.disable_compression:
- flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
- else:
- flags = 0
- call.write(request_serializer(payload), operation_id, flags)
- low_write = _LowWrite.ACTIVE
- due = set((_WRITE, _METADATA, _FINISH,))
- context = _Context()
- self._rpc_states[operation_id] = _RPCState(
- call, request_serializer, response_deserializer, 1,
- _Read.AWAITING_METADATA, 1 if allowance is None else (1 + allowance),
- high_write, low_write, due, context)
- protocol = links.Protocol(links.Protocol.Kind.INVOCATION_CONTEXT, context)
- ticket = links.Ticket(
- operation_id, 0, None, None, None, None, None, None, None, None, None,
- None, None, protocol)
- self._relay.add_value(ticket)
-
- def _advance(self, operation_id, rpc_state, payload, termination, allowance):
- if payload is not None:
- disable_compression = rpc_state.context.next_compression_disabled()
- if disable_compression:
- flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
- else:
- flags = 0
- rpc_state.call.write(
- rpc_state.request_serializer(payload), operation_id, flags)
- rpc_state.low_write = _LowWrite.ACTIVE
- rpc_state.due.add(_WRITE)
-
- if allowance is not None:
- if rpc_state.read is _Read.AWAITING_ALLOWANCE:
- rpc_state.allowance += allowance - 1
- rpc_state.call.read(operation_id)
- rpc_state.read = _Read.READING
- rpc_state.due.add(_READ)
- else:
- rpc_state.allowance += allowance
-
- if termination is links.Ticket.Termination.COMPLETION:
- rpc_state.high_write = _HighWrite.CLOSED
- if rpc_state.low_write is _LowWrite.OPEN:
- rpc_state.call.complete(operation_id)
- rpc_state.due.add(_COMPLETE)
- rpc_state.low_write = _LowWrite.CLOSED
- elif termination is not None:
- rpc_state.call.cancel()
-
- def add_ticket(self, ticket):
- with self._lock:
- if ticket.sequence_number == 0:
- if self._completion_queue is None:
- logging.error('Received invocation ticket %s after stop!', ticket)
- else:
- if (ticket.protocol is not None and
- ticket.protocol.kind is links.Protocol.Kind.CALL_OPTION):
- grpc_call_options = ticket.protocol.value
- else:
- grpc_call_options = None
- self._invoke(
- ticket.operation_id, ticket.group, ticket.method,
- ticket.initial_metadata, ticket.payload, ticket.termination,
- ticket.timeout, ticket.allowance, grpc_call_options)
- else:
- rpc_state = self._rpc_states.get(ticket.operation_id)
- if rpc_state is not None:
- self._advance(
- ticket.operation_id, rpc_state, ticket.payload,
- ticket.termination, ticket.allowance)
-
- def start(self):
- """Starts this object.
-
- This method must be called before attempting to exchange tickets with this
- object.
- """
- with self._lock:
- self._completion_queue = _intermediary_low.CompletionQueue()
- self._pool = logging_pool.pool(1)
- self._pool.submit(self._spin, self._completion_queue)
-
- def stop(self):
- """Stops this object.
-
- This method must be called for proper termination of this object, and no
- attempts to exchange tickets with this object may be made after this method
- has been called.
- """
- with self._lock:
- if not self._rpc_states:
- self._completion_queue.stop()
- self._completion_queue = None
- pool = self._pool
- pool.shutdown(wait=True)
-
-
-class InvocationLink(six.with_metaclass(abc.ABCMeta, links.Link, activated.Activated)):
- """A links.Link for use on the invocation-side of a gRPC connection.
-
- Implementations of this interface are only valid for use when activated.
- """
-
-
-class _InvocationLink(InvocationLink):
-
- def __init__(
- self, channel, host, metadata_transformer, request_serializers,
- response_deserializers):
- self._relay = relay.relay(None)
- self._kernel = _Kernel(
- channel, host,
- _IDENTITY if metadata_transformer is None else metadata_transformer,
- {} if request_serializers is None else request_serializers,
- {} if response_deserializers is None else response_deserializers,
- self._relay)
-
- def _start(self):
- self._relay.start()
- self._kernel.start()
- return self
-
- def _stop(self):
- self._kernel.stop()
- self._relay.stop()
-
- def accept_ticket(self, ticket):
- """See links.Link.accept_ticket for specification."""
- self._kernel.add_ticket(ticket)
-
- def join_link(self, link):
- """See links.Link.join_link for specification."""
- self._relay.set_behavior(link.accept_ticket)
-
- def __enter__(self):
- """See activated.Activated.__enter__ for specification."""
- return self._start()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """See activated.Activated.__exit__ for specification."""
- self._stop()
- return False
-
- def start(self):
- """See activated.Activated.start for specification."""
- return self._start()
-
- def stop(self):
- """See activated.Activated.stop for specification."""
- self._stop()
-
-
-def invocation_link(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers):
- """Creates an InvocationLink.
-
- Args:
- channel: An _intermediary_low.Channel for use by the link.
- host: The host to specify when invoking RPCs.
- metadata_transformer: A callable that takes an invocation-side initial
- metadata value and returns another metadata value to send in its place.
- May be None.
- request_serializers: A dict from group-method pair to request object
- serialization behavior.
- response_deserializers: A dict from group-method pair to response object
- deserialization behavior.
-
- Returns:
- An InvocationLink.
- """
- return _InvocationLink(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers)
diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
deleted file mode 100644
index 5fc4994ca0..0000000000
--- a/src/python/grpcio/grpc/_links/service.py
+++ /dev/null
@@ -1,509 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire."""
-
-import abc
-import enum
-import logging
-import threading
-import six
-import time
-
-from grpc._adapter import _intermediary_low
-from grpc._links import _constants
-from grpc.beta import interfaces as beta_interfaces
-from grpc.framework.foundation import logging_pool
-from grpc.framework.foundation import relay
-from grpc.framework.interfaces.links import links
-
-_IDENTITY = lambda x: x
-
-_TERMINATION_KIND_TO_CODE = {
- links.Ticket.Termination.COMPLETION: _intermediary_low.Code.OK,
- links.Ticket.Termination.CANCELLATION: _intermediary_low.Code.CANCELLED,
- links.Ticket.Termination.EXPIRATION:
- _intermediary_low.Code.DEADLINE_EXCEEDED,
- links.Ticket.Termination.SHUTDOWN: _intermediary_low.Code.UNAVAILABLE,
- links.Ticket.Termination.RECEPTION_FAILURE: _intermediary_low.Code.INTERNAL,
- links.Ticket.Termination.TRANSMISSION_FAILURE:
- _intermediary_low.Code.INTERNAL,
- links.Ticket.Termination.LOCAL_FAILURE: _intermediary_low.Code.UNKNOWN,
- links.Ticket.Termination.REMOTE_FAILURE: _intermediary_low.Code.UNKNOWN,
-}
-
-_STOP = _intermediary_low.Event.Kind.STOP
-_WRITE = _intermediary_low.Event.Kind.WRITE_ACCEPTED
-_COMPLETE = _intermediary_low.Event.Kind.COMPLETE_ACCEPTED
-_SERVICE = _intermediary_low.Event.Kind.SERVICE_ACCEPTED
-_READ = _intermediary_low.Event.Kind.READ_ACCEPTED
-_FINISH = _intermediary_low.Event.Kind.FINISH
-
-
-@enum.unique
-class _Read(enum.Enum):
- READING = 'reading'
- # TODO(issue 2916): This state will again be necessary after eliminating the
- # "early_read" field of _RPCState and going back to only reading when granted
- # allowance to read.
- # AWAITING_ALLOWANCE = 'awaiting allowance'
- CLOSED = 'closed'
-
-
-@enum.unique
-class _HighWrite(enum.Enum):
- OPEN = 'open'
- CLOSED = 'closed'
-
-
-@enum.unique
-class _LowWrite(enum.Enum):
- """The possible categories of low-level write state."""
-
- OPEN = 'OPEN'
- ACTIVE = 'ACTIVE'
- CLOSED = 'CLOSED'
-
-
-class _Context(beta_interfaces.GRPCServicerContext):
-
- def __init__(self, call):
- self._lock = threading.Lock()
- self._call = call
- self._disable_next_compression = False
-
- def peer(self):
- with self._lock:
- return self._call.peer()
-
- def disable_next_response_compression(self):
- with self._lock:
- self._disable_next_compression = True
-
- def next_compression_disabled(self):
- with self._lock:
- disabled = self._disable_next_compression
- self._disable_next_compression = False
- return disabled
-
-
-class _RPCState(object):
-
- def __init__(
- self, request_deserializer, response_serializer, sequence_number, read,
- early_read, allowance, high_write, low_write, premetadataed,
- terminal_metadata, code, message, due, context):
- self.request_deserializer = request_deserializer
- self.response_serializer = response_serializer
- self.sequence_number = sequence_number
- self.read = read
- # TODO(issue 2916): Eliminate this by eliminating the necessity of calling
- # call.read just to advance the RPC.
- self.early_read = early_read # A raw (not deserialized) read.
- self.allowance = allowance
- self.high_write = high_write
- self.low_write = low_write
- self.premetadataed = premetadataed
- self.terminal_metadata = terminal_metadata
- self.code = code
- self.message = message
- self.due = due
- self.context = context
-
-
-def _no_longer_due(kind, rpc_state, key, rpc_states):
- rpc_state.due.remove(kind)
- if not rpc_state.due:
- del rpc_states[key]
-
-
-def _metadatafy(call, metadata):
- for metadata_key, metadata_value in metadata:
- call.add_metadata(metadata_key, metadata_value)
-
-
-def _status(termination_kind, high_code, details):
- low_details = b'' if details is None else details
- if high_code is None:
- low_code = _TERMINATION_KIND_TO_CODE[termination_kind]
- else:
- low_code = _constants.HIGH_STATUS_CODE_TO_LOW_STATUS_CODE[high_code]
- return _intermediary_low.Status(low_code, low_details)
-
-
-class _Kernel(object):
-
- def __init__(self, request_deserializers, response_serializers, ticket_relay):
- self._lock = threading.Lock()
- self._request_deserializers = request_deserializers
- self._response_serializers = response_serializers
- self._relay = ticket_relay
-
- self._completion_queue = None
- self._due = set()
- self._server = None
- self._rpc_states = {}
- self._pool = None
-
- def _on_service_acceptance_event(self, event, server):
- server.service(None)
-
- service_acceptance = event.service_acceptance
- call = service_acceptance.call
- call.accept(self._completion_queue, call)
- try:
- service_method = service_acceptance.method
- if six.PY3:
- service_method = service_method.decode('latin1')
- group, method = service_method.split('/')[1:3]
- except ValueError:
- logging.info('Illegal path "%s"!', service_acceptance.method)
- return
- request_deserializer = self._request_deserializers.get(
- (group, method), _IDENTITY)
- response_serializer = self._response_serializers.get(
- (group, method), _IDENTITY)
-
- call.read(call)
- context = _Context(call)
- self._rpc_states[call] = _RPCState(
- request_deserializer, response_serializer, 1, _Read.READING, None, 1,
- _HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None,
- set((_READ, _FINISH,)), context)
- protocol = links.Protocol(links.Protocol.Kind.SERVICER_CONTEXT, context)
- ticket = links.Ticket(
- call, 0, group, method, links.Ticket.Subscription.FULL,
- service_acceptance.deadline - time.time(), None, event.metadata, None,
- None, None, None, None, protocol)
- self._relay.add_value(ticket)
-
- def _on_read_event(self, event):
- call = event.tag
- rpc_state = self._rpc_states[call]
-
- if event.bytes is None:
- rpc_state.read = _Read.CLOSED
- payload = None
- termination = links.Ticket.Termination.COMPLETION
- _no_longer_due(_READ, rpc_state, call, self._rpc_states)
- else:
- if 0 < rpc_state.allowance:
- payload = rpc_state.request_deserializer(event.bytes)
- termination = None
- rpc_state.allowance -= 1
- call.read(call)
- else:
- rpc_state.early_read = event.bytes
- _no_longer_due(_READ, rpc_state, call, self._rpc_states)
- return
- # TODO(issue 2916): Instead of returning:
- # rpc_state.read = _Read.AWAITING_ALLOWANCE
- ticket = links.Ticket(
- call, rpc_state.sequence_number, None, None, None, None, None, None,
- payload, None, None, None, termination, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
-
- def _on_write_event(self, event):
- call = event.tag
- rpc_state = self._rpc_states[call]
-
- if rpc_state.high_write is _HighWrite.CLOSED:
- if rpc_state.terminal_metadata is not None:
- _metadatafy(call, rpc_state.terminal_metadata)
- status = _status(
- links.Ticket.Termination.COMPLETION, rpc_state.code,
- rpc_state.message)
- call.status(status, call)
- rpc_state.low_write = _LowWrite.CLOSED
- rpc_state.due.add(_COMPLETE)
- rpc_state.due.remove(_WRITE)
- else:
- ticket = links.Ticket(
- call, rpc_state.sequence_number, None, None, None, None, 1, None,
- None, None, None, None, None, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
- rpc_state.low_write = _LowWrite.OPEN
- _no_longer_due(_WRITE, rpc_state, call, self._rpc_states)
-
- def _on_finish_event(self, event):
- call = event.tag
- rpc_state = self._rpc_states[call]
- _no_longer_due(_FINISH, rpc_state, call, self._rpc_states)
- code = event.status.code
- if code == _intermediary_low.Code.OK:
- return
-
- if code == _intermediary_low.Code.CANCELLED:
- termination = links.Ticket.Termination.CANCELLATION
- elif code == _intermediary_low.Code.DEADLINE_EXCEEDED:
- termination = links.Ticket.Termination.EXPIRATION
- else:
- termination = links.Ticket.Termination.TRANSMISSION_FAILURE
- ticket = links.Ticket(
- call, rpc_state.sequence_number, None, None, None, None, None, None,
- None, None, None, None, termination, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(ticket)
-
- def _spin(self, completion_queue, server):
- while True:
- event = completion_queue.get(None)
- with self._lock:
- if event.kind is _STOP:
- self._due.remove(_STOP)
- elif event.kind is _READ:
- self._on_read_event(event)
- elif event.kind is _WRITE:
- self._on_write_event(event)
- elif event.kind is _COMPLETE:
- _no_longer_due(
- _COMPLETE, self._rpc_states.get(event.tag), event.tag,
- self._rpc_states)
- elif event.kind is _intermediary_low.Event.Kind.FINISH:
- self._on_finish_event(event)
- elif event.kind is _SERVICE:
- if self._server is None:
- self._due.remove(_SERVICE)
- else:
- self._on_service_acceptance_event(event, server)
- else:
- logging.error('Illegal event! %s', (event,))
-
- if not self._due and not self._rpc_states:
- completion_queue.stop()
- return
-
- def add_ticket(self, ticket):
- with self._lock:
- call = ticket.operation_id
- rpc_state = self._rpc_states.get(call)
- if rpc_state is None:
- return
-
- if ticket.initial_metadata is not None:
- _metadatafy(call, ticket.initial_metadata)
- call.premetadata()
- rpc_state.premetadataed = True
- elif not rpc_state.premetadataed:
- if (ticket.terminal_metadata is not None or
- ticket.payload is not None or
- ticket.termination is not None or
- ticket.code is not None or
- ticket.message is not None):
- call.premetadata()
- rpc_state.premetadataed = True
-
- if ticket.allowance is not None:
- if rpc_state.early_read is None:
- rpc_state.allowance += ticket.allowance
- else:
- payload = rpc_state.request_deserializer(rpc_state.early_read)
- rpc_state.allowance += ticket.allowance - 1
- rpc_state.early_read = None
- if rpc_state.read is _Read.READING:
- call.read(call)
- rpc_state.due.add(_READ)
- termination = None
- else:
- termination = links.Ticket.Termination.COMPLETION
- early_read_ticket = links.Ticket(
- call, rpc_state.sequence_number, None, None, None, None, None,
- None, payload, None, None, None, termination, None)
- rpc_state.sequence_number += 1
- self._relay.add_value(early_read_ticket)
-
- if ticket.payload is not None:
- disable_compression = rpc_state.context.next_compression_disabled()
- if disable_compression:
- flags = _intermediary_low.WriteFlags.WRITE_NO_COMPRESS
- else:
- flags = 0
- call.write(rpc_state.response_serializer(ticket.payload), call, flags)
- rpc_state.due.add(_WRITE)
- rpc_state.low_write = _LowWrite.ACTIVE
-
- if ticket.terminal_metadata is not None:
- rpc_state.terminal_metadata = ticket.terminal_metadata
- if ticket.code is not None:
- rpc_state.code = ticket.code
- if ticket.message is not None:
- rpc_state.message = ticket.message
-
- if ticket.termination is links.Ticket.Termination.COMPLETION:
- rpc_state.high_write = _HighWrite.CLOSED
- if rpc_state.low_write is _LowWrite.OPEN:
- if rpc_state.terminal_metadata is not None:
- _metadatafy(call, rpc_state.terminal_metadata)
- status = _status(
- links.Ticket.Termination.COMPLETION, rpc_state.code,
- rpc_state.message)
- call.status(status, call)
- rpc_state.due.add(_COMPLETE)
- rpc_state.low_write = _LowWrite.CLOSED
- elif ticket.termination is not None:
- if rpc_state.terminal_metadata is not None:
- _metadatafy(call, rpc_state.terminal_metadata)
- status = _status(
- ticket.termination, rpc_state.code, rpc_state.message)
- call.status(status, call)
- rpc_state.due.add(_COMPLETE)
-
- def add_port(self, address, server_credentials):
- with self._lock:
- if self._server is None:
- self._completion_queue = _intermediary_low.CompletionQueue()
- self._server = _intermediary_low.Server(self._completion_queue)
- if server_credentials is None:
- return self._server.add_http2_addr(address)
- else:
- return self._server.add_secure_http2_addr(address, server_credentials)
-
- def start(self):
- with self._lock:
- if self._server is None:
- self._completion_queue = _intermediary_low.CompletionQueue()
- self._server = _intermediary_low.Server(self._completion_queue)
- self._pool = logging_pool.pool(1)
- self._pool.submit(self._spin, self._completion_queue, self._server)
- self._server.start()
- self._server.service(None)
- self._due.add(_SERVICE)
-
- def begin_stop(self):
- with self._lock:
- self._server.stop()
- self._due.add(_STOP)
- self._server = None
-
- def end_stop(self):
- with self._lock:
- pool = self._pool
- pool.shutdown(wait=True)
-
-
-class ServiceLink(links.Link):
- """A links.Link for use on the service-side of a gRPC connection.
-
- Implementations of this interface are only valid for use between calls to
- their start method and one of their stop methods.
- """
-
- @abc.abstractmethod
- def add_port(self, address, server_credentials):
- """Adds a port on which to service RPCs after this link has been started.
-
- Args:
- address: The address on which to service RPCs with a port number of zero
- requesting that a port number be automatically selected and used.
- server_credentials: An _intermediary_low.ServerCredentials object, or
- None for insecure service.
-
- Returns:
- An integer port on which RPCs will be serviced after this link has been
- started. This is typically the same number as the port number contained
- in the passed address, but will likely be different if the port number
- contained in the passed address was zero.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def start(self):
- """Starts this object.
-
- This method must be called before attempting to use this Link in ticket
- exchange.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def begin_stop(self):
- """Indicate imminent link stop and immediate rejection of new RPCs.
-
- New RPCs will be rejected as soon as this method is called, but ongoing RPCs
- will be allowed to continue until they terminate. This method does not
- block.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def end_stop(self):
- """Finishes stopping this link.
-
- begin_stop must have been called exactly once before calling this method.
-
- All in-progress RPCs will be terminated immediately.
- """
- raise NotImplementedError()
-
-
-class _ServiceLink(ServiceLink):
-
- def __init__(self, request_deserializers, response_serializers):
- self._relay = relay.relay(None)
- self._kernel = _Kernel(
- {} if request_deserializers is None else request_deserializers,
- {} if response_serializers is None else response_serializers,
- self._relay)
-
- def accept_ticket(self, ticket):
- self._kernel.add_ticket(ticket)
-
- def join_link(self, link):
- self._relay.set_behavior(link.accept_ticket)
-
- def add_port(self, address, server_credentials):
- return self._kernel.add_port(address, server_credentials)
-
- def start(self):
- self._relay.start()
- return self._kernel.start()
-
- def begin_stop(self):
- self._kernel.begin_stop()
-
- def end_stop(self):
- self._kernel.end_stop()
- self._relay.stop()
-
-
-def service_link(request_deserializers, response_serializers):
- """Creates a ServiceLink.
-
- Args:
- request_deserializers: A dict from group-method pair to request object
- deserialization behavior.
- response_serializers: A dict from group-method pair to response ojbect
- serialization behavior.
-
- Returns:
- A ServiceLink.
- """
- return _ServiceLink(request_deserializers, response_serializers)
diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py
deleted file mode 100644
index eb0aadb42f..0000000000
--- a/src/python/grpcio/grpc/beta/_server.py
+++ /dev/null
@@ -1,209 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Beta API server implementation."""
-
-import threading
-
-from grpc._links import service
-from grpc.beta import interfaces
-from grpc.framework.core import implementations as _core_implementations
-from grpc.framework.crust import implementations as _crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.links import utilities
-
-_DEFAULT_POOL_SIZE = 8
-_DEFAULT_TIMEOUT = 300
-_MAXIMUM_TIMEOUT = 24 * 60 * 60
-
-
-def _set_event():
- event = threading.Event()
- event.set()
- return event
-
-
-class _GRPCServicer(base.Servicer):
-
- def __init__(self, delegate):
- self._delegate = delegate
-
- def service(self, group, method, context, output_operator):
- try:
- return self._delegate.service(group, method, context, output_operator)
- except base.NoSuchMethodError as e:
- if e.code is None and e.details is None:
- raise base.NoSuchMethodError(
- interfaces.StatusCode.UNIMPLEMENTED,
- 'Method "%s" of service "%s" not implemented!' % (method, group))
- else:
- raise
-
-
-class _Server(interfaces.Server):
-
- def __init__(
- self, implementations, multi_implementation, pool, pool_size,
- default_timeout, maximum_timeout, grpc_link):
- self._lock = threading.Lock()
- self._implementations = implementations
- self._multi_implementation = multi_implementation
- self._customer_pool = pool
- self._pool_size = pool_size
- self._default_timeout = default_timeout
- self._maximum_timeout = maximum_timeout
- self._grpc_link = grpc_link
-
- self._end_link = None
- self._stop_events = None
- self._pool = None
-
- def _start(self):
- with self._lock:
- if self._end_link is not None:
- raise ValueError('Cannot start already-started server!')
-
- if self._customer_pool is None:
- self._pool = logging_pool.pool(self._pool_size)
- assembly_pool = self._pool
- else:
- assembly_pool = self._customer_pool
-
- servicer = _GRPCServicer(
- _crust_implementations.servicer(
- self._implementations, self._multi_implementation, assembly_pool))
-
- self._end_link = _core_implementations.service_end_link(
- servicer, self._default_timeout, self._maximum_timeout)
-
- self._grpc_link.join_link(self._end_link)
- self._end_link.join_link(self._grpc_link)
- self._grpc_link.start()
- self._end_link.start()
-
- def _dissociate_links_and_shut_down_pool(self):
- self._grpc_link.end_stop()
- self._grpc_link.join_link(utilities.NULL_LINK)
- self._end_link.join_link(utilities.NULL_LINK)
- self._end_link = None
- if self._pool is not None:
- self._pool.shutdown(wait=True)
- self._pool = None
-
- def _stop_stopping(self):
- self._dissociate_links_and_shut_down_pool()
- for stop_event in self._stop_events:
- stop_event.set()
- self._stop_events = None
-
- def _stop_started(self):
- self._grpc_link.begin_stop()
- self._end_link.stop(0).wait()
- self._dissociate_links_and_shut_down_pool()
-
- def _foreign_thread_stop(self, end_stop_event, stop_events):
- end_stop_event.wait()
- with self._lock:
- if self._stop_events is stop_events:
- self._stop_stopping()
-
- def _schedule_stop(self, grace):
- with self._lock:
- if self._end_link is None:
- return _set_event()
- server_stop_event = threading.Event()
- if self._stop_events is None:
- self._stop_events = [server_stop_event]
- self._grpc_link.begin_stop()
- else:
- self._stop_events.append(server_stop_event)
- end_stop_event = self._end_link.stop(grace)
- end_stop_thread = threading.Thread(
- target=self._foreign_thread_stop,
- args=(end_stop_event, self._stop_events))
- end_stop_thread.start()
- return server_stop_event
-
- def _stop_now(self):
- with self._lock:
- if self._end_link is not None:
- if self._stop_events is None:
- self._stop_started()
- else:
- self._stop_stopping()
-
- def add_insecure_port(self, address):
- with self._lock:
- if self._end_link is None:
- return self._grpc_link.add_port(address, None)
- else:
- raise ValueError('Can\'t add port to serving server!')
-
- def add_secure_port(self, address, server_credentials):
- with self._lock:
- if self._end_link is None:
- return self._grpc_link.add_port(
- address, server_credentials._low_credentials) # pylint: disable=protected-access
- else:
- raise ValueError('Can\'t add port to serving server!')
-
- def start(self):
- self._start()
-
- def stop(self, grace):
- if 0 < grace:
- return self._schedule_stop(grace)
- else:
- self._stop_now()
- return _set_event()
-
- def __enter__(self):
- self._start()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop_now()
- return False
-
- def __del__(self):
- self._stop_now()
-
-
-def server(
- implementations, multi_implementation, request_deserializers,
- response_serializers, thread_pool, thread_pool_size, default_timeout,
- maximum_timeout):
- grpc_link = service.service_link(request_deserializers, response_serializers)
- return _Server(
- implementations, multi_implementation, thread_pool,
- _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size,
- _DEFAULT_TIMEOUT if default_timeout is None else default_timeout,
- _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout,
- grpc_link)
diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py
deleted file mode 100644
index 2af019309a..0000000000
--- a/src/python/grpcio/grpc/beta/_stub.py
+++ /dev/null
@@ -1,155 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Beta API stub implementation."""
-
-import threading
-
-from grpc._links import invocation
-from grpc.framework.core import implementations as _core_implementations
-from grpc.framework.crust import implementations as _crust_implementations
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.links import utilities
-
-_DEFAULT_POOL_SIZE = 6
-
-
-class _AutoIntermediary(object):
-
- def __init__(self, up, down, delegate):
- self._lock = threading.Lock()
- self._up = up
- self._down = down
- self._in_context = False
- self._delegate = delegate
-
- def __getattr__(self, attr):
- with self._lock:
- if self._delegate is None:
- raise AttributeError('No useful attributes out of context!')
- else:
- return getattr(self._delegate, attr)
-
- def __enter__(self):
- with self._lock:
- if self._in_context:
- raise ValueError('Already in context!')
- elif self._delegate is None:
- self._delegate = self._up()
- self._in_context = True
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- with self._lock:
- if not self._in_context:
- raise ValueError('Not in context!')
- self._down()
- self._in_context = False
- self._delegate = None
- return False
-
- def __del__(self):
- with self._lock:
- if self._delegate is not None:
- self._down()
- self._delegate = None
-
-
-class _StubAssemblyManager(object):
-
- def __init__(
- self, thread_pool, thread_pool_size, end_link, grpc_link, stub_creator):
- self._thread_pool = thread_pool
- self._pool_size = thread_pool_size
- self._end_link = end_link
- self._grpc_link = grpc_link
- self._stub_creator = stub_creator
- self._own_pool = None
-
- def up(self):
- if self._thread_pool is None:
- self._own_pool = logging_pool.pool(
- _DEFAULT_POOL_SIZE if self._pool_size is None else self._pool_size)
- assembly_pool = self._own_pool
- else:
- assembly_pool = self._thread_pool
- self._end_link.join_link(self._grpc_link)
- self._grpc_link.join_link(self._end_link)
- self._end_link.start()
- self._grpc_link.start()
- return self._stub_creator(self._end_link, assembly_pool)
-
- def down(self):
- self._end_link.stop(0).wait()
- self._grpc_link.stop()
- self._end_link.join_link(utilities.NULL_LINK)
- self._grpc_link.join_link(utilities.NULL_LINK)
- if self._own_pool is not None:
- self._own_pool.shutdown(wait=True)
- self._own_pool = None
-
-
-def _assemble(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size, stub_creator):
- end_link = _core_implementations.invocation_end_link()
- grpc_link = invocation.invocation_link(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers)
- stub_assembly_manager = _StubAssemblyManager(
- thread_pool, thread_pool_size, end_link, grpc_link, stub_creator)
- stub = stub_assembly_manager.up()
- return _AutoIntermediary(
- stub_assembly_manager.up, stub_assembly_manager.down, stub)
-
-
-def _dynamic_stub_creator(service, cardinalities):
- def create_dynamic_stub(end_link, invocation_pool):
- return _crust_implementations.dynamic_stub(
- end_link, service, cardinalities, invocation_pool)
- return create_dynamic_stub
-
-
-def generic_stub(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size):
- return _assemble(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size,
- _crust_implementations.generic_stub)
-
-
-def dynamic_stub(
- channel, host, service, cardinalities, metadata_transformer,
- request_serializers, response_deserializers, thread_pool,
- thread_pool_size):
- return _assemble(
- channel, host, metadata_transformer, request_serializers,
- response_deserializers, thread_pool, thread_pool_size,
- _dynamic_stub_creator(service, cardinalities))
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index 4ae6e7d675..ab25fd5eec 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -37,7 +37,6 @@ import threading # pylint: disable=unused-import
# cardinality and face are referenced from specification in this module.
import grpc
from grpc import _auth
-from grpc._adapter import _types
from grpc.beta import _client_adaptations
from grpc.beta import _server_adaptations
from grpc.beta import interfaces
diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/grpc/framework/core/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py
deleted file mode 100644
index 0f47cb48e0..0000000000
--- a/src/python/grpcio/grpc/framework/core/_constants.py
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Private constants for the package."""
-
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.links import links
-
-TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = {
- base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE,
- base.Subscription.Kind.TERMINATION_ONLY:
- links.Ticket.Subscription.TERMINATION,
- base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL,
- }
-
-# Mapping from abortive operation outcome to ticket termination to be
-# sent to the other side of the operation, or None to indicate that no
-# ticket should be sent to the other side in the event of such an
-# outcome.
-ABORTION_OUTCOME_TO_TICKET_TERMINATION = {
- base.Outcome.Kind.CANCELLED: links.Ticket.Termination.CANCELLATION,
- base.Outcome.Kind.EXPIRED: links.Ticket.Termination.EXPIRATION,
- base.Outcome.Kind.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN,
- base.Outcome.Kind.REMOTE_SHUTDOWN: None,
- base.Outcome.Kind.RECEPTION_FAILURE:
- links.Ticket.Termination.RECEPTION_FAILURE,
- base.Outcome.Kind.TRANSMISSION_FAILURE: None,
- base.Outcome.Kind.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE,
- base.Outcome.Kind.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE,
-}
-
-INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:'
-TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = (
- 'Exception calling termination callback!')
diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py
deleted file mode 100644
index a346e9d478..0000000000
--- a/src/python/grpcio/grpc/framework/core/_context.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for operation context."""
-
-import time
-
-# _interfaces is referenced from specification in this module.
-from grpc.framework.core import _interfaces # pylint: disable=unused-import
-from grpc.framework.core import _utilities
-from grpc.framework.interfaces.base import base
-
-
-class OperationContext(base.OperationContext):
- """An implementation of interfaces.OperationContext."""
-
- def __init__(
- self, lock, termination_manager, transmission_manager,
- expiration_manager):
- """Constructor.
-
- Args:
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- """
- self._lock = lock
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
-
- def _abort(self, outcome_kind):
- with self._lock:
- if self._termination_manager.outcome is None:
- outcome = _utilities.Outcome(outcome_kind, None, None)
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
- self._expiration_manager.terminate()
-
- def outcome(self):
- """See base.OperationContext.outcome for specification."""
- with self._lock:
- return self._termination_manager.outcome
-
- def add_termination_callback(self, callback):
- """See base.OperationContext.add_termination_callback."""
- with self._lock:
- if self._termination_manager.outcome is None:
- self._termination_manager.add_callback(callback)
- return None
- else:
- return self._termination_manager.outcome
-
- def time_remaining(self):
- """See base.OperationContext.time_remaining for specification."""
- with self._lock:
- deadline = self._expiration_manager.deadline()
- return max(0.0, deadline - time.time())
-
- def cancel(self):
- """See base.OperationContext.cancel for specification."""
- self._abort(base.Outcome.Kind.CANCELLED)
-
- def fail(self, exception):
- """See base.OperationContext.fail for specification."""
- self._abort(base.Outcome.Kind.LOCAL_FAILURE)
diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py
deleted file mode 100644
index 8ab59dc3e5..0000000000
--- a/src/python/grpcio/grpc/framework/core/_emission.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for handling emitted values."""
-
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.interfaces.base import base
-
-
-class EmissionManager(_interfaces.EmissionManager):
- """An EmissionManager implementation."""
-
- def __init__(
- self, lock, termination_manager, transmission_manager,
- expiration_manager):
- """Constructor.
-
- Args:
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- """
- self._lock = lock
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
- self._ingestion_manager = None
-
- self._initial_metadata_seen = False
- self._payload_seen = False
- self._completion_seen = False
-
- def set_ingestion_manager(self, ingestion_manager):
- """Sets the ingestion manager with which this manager will cooperate.
-
- Args:
- ingestion_manager: The _interfaces.IngestionManager for the operation.
- """
- self._ingestion_manager = ingestion_manager
-
- def advance(
- self, initial_metadata=None, payload=None, completion=None,
- allowance=None):
- initial_metadata_present = initial_metadata is not None
- payload_present = payload is not None
- completion_present = completion is not None
- allowance_present = allowance is not None
- with self._lock:
- if self._termination_manager.outcome is None:
- if (initial_metadata_present and (
- self._initial_metadata_seen or self._payload_seen or
- self._completion_seen) or
- payload_present and self._completion_seen or
- completion_present and self._completion_seen or
- allowance_present and allowance <= 0):
- outcome = _utilities.Outcome(
- base.Outcome.Kind.LOCAL_FAILURE, None, None)
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
- self._expiration_manager.terminate()
- else:
- self._initial_metadata_seen |= initial_metadata_present
- self._payload_seen |= payload_present
- self._completion_seen |= completion_present
- if completion_present:
- self._termination_manager.emission_complete()
- self._ingestion_manager.local_emissions_done()
- self._transmission_manager.advance(
- initial_metadata, payload, completion, allowance)
- if allowance_present:
- self._ingestion_manager.add_local_allowance(allowance)
diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py
deleted file mode 100644
index 009d27c915..0000000000
--- a/src/python/grpcio/grpc/framework/core/_end.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Implementation of base.End."""
-
-import abc
-import threading
-import uuid
-
-import six
-
-from grpc.framework.core import _operation
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import callable_util
-from grpc.framework.foundation import later
-from grpc.framework.foundation import logging_pool
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.links import links
-from grpc.framework.interfaces.links import utilities
-
-_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
-
-
-class End(six.with_metaclass(abc.ABCMeta, base.End, links.Link)):
- """A bridge between base.End and links.Link.
-
- Implementations of this interface translate arriving tickets into
- calls on application objects implementing base interfaces and
- translate calls from application objects implementing base interfaces
- into tickets sent to a joined link.
- """
-
-
-class _Cycle(object):
- """State for a single start-stop End lifecycle."""
-
- def __init__(self, pool):
- self.pool = pool
- self.grace = False
- self.futures = []
- self.operations = {}
- self.idle_actions = []
-
-
-def _abort(operations):
- for operation in operations:
- operation.abort(base.Outcome.Kind.LOCAL_SHUTDOWN)
-
-
-def _cancel_futures(futures):
- for future in futures:
- future.cancel()
-
-
-def _future_shutdown(lock, cycle, event):
- def in_future():
- with lock:
- _abort(cycle.operations.values())
- _cancel_futures(cycle.futures)
- return in_future
-
-
-class _End(End):
- """An End implementation."""
-
- def __init__(self, servicer_package):
- """Constructor.
-
- Args:
- servicer_package: A _ServicerPackage for servicing operations or None if
- this end will not be used to service operations.
- """
- self._lock = threading.Condition()
- self._servicer_package = servicer_package
-
- self._stats = {outcome_kind: 0 for outcome_kind in base.Outcome.Kind}
-
- self._mate = None
-
- self._cycle = None
-
- def _termination_action(self, operation_id):
- """Constructs the termination action for a single operation.
-
- Args:
- operation_id: The operation ID for the termination action.
-
- Returns:
- A callable that takes an operation outcome kind as its sole parameter and
- that should be used as the termination action for the operation
- associated with the given operation ID.
- """
- def termination_action(outcome_kind):
- with self._lock:
- self._stats[outcome_kind] += 1
- self._cycle.operations.pop(operation_id, None)
- if not self._cycle.operations:
- for action in self._cycle.idle_actions:
- self._cycle.pool.submit(action)
- self._cycle.idle_actions = []
- if self._cycle.grace:
- _cancel_futures(self._cycle.futures)
- self._cycle.pool.shutdown(wait=False)
- self._cycle = None
- return termination_action
-
- def start(self):
- """See base.End.start for specification."""
- with self._lock:
- if self._cycle is not None:
- raise ValueError('Tried to start a not-stopped End!')
- else:
- self._cycle = _Cycle(logging_pool.pool(1))
-
- def stop(self, grace):
- """See base.End.stop for specification."""
- with self._lock:
- if self._cycle is None:
- event = threading.Event()
- event.set()
- return event
- elif not self._cycle.operations:
- event = threading.Event()
- self._cycle.pool.submit(event.set)
- self._cycle.pool.shutdown(wait=False)
- self._cycle = None
- return event
- else:
- self._cycle.grace = True
- event = threading.Event()
- self._cycle.idle_actions.append(event.set)
- if 0 < grace:
- future = later.later(
- grace, _future_shutdown(self._lock, self._cycle, event))
- self._cycle.futures.append(future)
- else:
- _abort(self._cycle.operations.values())
- return event
-
- def operate(
- self, group, method, subscription, timeout, initial_metadata=None,
- payload=None, completion=None, protocol_options=None):
- """See base.End.operate for specification."""
- operation_id = uuid.uuid4()
- with self._lock:
- if self._cycle is None or self._cycle.grace:
- raise ValueError('Can\'t operate on stopped or stopping End!')
- termination_action = self._termination_action(operation_id)
- operation = _operation.invocation_operate(
- operation_id, group, method, subscription, timeout, protocol_options,
- initial_metadata, payload, completion, self._mate.accept_ticket,
- termination_action, self._cycle.pool)
- self._cycle.operations[operation_id] = operation
- return operation.context, operation.operator
-
- def operation_stats(self):
- """See base.End.operation_stats for specification."""
- with self._lock:
- return dict(self._stats)
-
- def add_idle_action(self, action):
- """See base.End.add_idle_action for specification."""
- with self._lock:
- if self._cycle is None:
- raise ValueError('Can\'t add idle action to stopped End!')
- action_with_exceptions_logged = callable_util.with_exceptions_logged(
- action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE)
- if self._cycle.operations:
- self._cycle.idle_actions.append(action_with_exceptions_logged)
- else:
- self._cycle.pool.submit(action_with_exceptions_logged)
-
- def accept_ticket(self, ticket):
- """See links.Link.accept_ticket for specification."""
- with self._lock:
- if self._cycle is not None:
- operation = self._cycle.operations.get(ticket.operation_id)
- if operation is not None:
- operation.handle_ticket(ticket)
- elif self._servicer_package is not None and not self._cycle.grace:
- termination_action = self._termination_action(ticket.operation_id)
- operation = _operation.service_operate(
- self._servicer_package, ticket, self._mate.accept_ticket,
- termination_action, self._cycle.pool)
- if operation is not None:
- self._cycle.operations[ticket.operation_id] = operation
-
- def join_link(self, link):
- """See links.Link.join_link for specification."""
- with self._lock:
- self._mate = utilities.NULL_LINK if link is None else link
-
-
-def serviceless_end_link():
- """Constructs an End usable only for invoking operations.
-
- Returns:
- An End usable for translating operations into ticket exchange.
- """
- return _End(None)
-
-
-def serviceful_end_link(servicer, default_timeout, maximum_timeout):
- """Constructs an End capable of servicing operations.
-
- Args:
- servicer: An interfaces.Servicer for servicing operations.
- default_timeout: A length of time in seconds to be used as the default
- time alloted for a single operation.
- maximum_timeout: A length of time in seconds to be used as the maximum
- time alloted for a single operation.
-
- Returns:
- An End capable of servicing the operations requested of it through ticket
- exchange.
- """
- return _End(
- _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout))
diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py
deleted file mode 100644
index ded0ab6bce..0000000000
--- a/src/python/grpcio/grpc/framework/core/_expiration.py
+++ /dev/null
@@ -1,154 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for operation expiration."""
-
-import time
-
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import later
-from grpc.framework.interfaces.base import base
-
-
-class _ExpirationManager(_interfaces.ExpirationManager):
- """An implementation of _interfaces.ExpirationManager."""
-
- def __init__(
- self, commencement, timeout, maximum_timeout, lock, termination_manager,
- transmission_manager):
- """Constructor.
-
- Args:
- commencement: The time in seconds since the epoch at which the operation
- began.
- timeout: A length of time in seconds to allow for the operation to run.
- maximum_timeout: The maximum length of time in seconds to allow for the
- operation to run despite what is requested via this object's
- change_timout method.
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- """
- self._lock = lock
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._commencement = commencement
- self._maximum_timeout = maximum_timeout
-
- self._timeout = timeout
- self._deadline = commencement + timeout
- self._index = None
- self._future = None
-
- def _expire(self, index):
- def expire():
- with self._lock:
- if self._future is not None and index == self._index:
- self._future = None
- self._termination_manager.expire()
- self._transmission_manager.abort(
- _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
- return expire
-
- def start(self):
- self._index = 0
- self._future = later.later(self._timeout, self._expire(0))
-
- def change_timeout(self, timeout):
- if self._future is not None and timeout != self._timeout:
- self._future.cancel()
- new_timeout = min(timeout, self._maximum_timeout)
- new_index = self._index + 1
- self._timeout = new_timeout
- self._deadline = self._commencement + new_timeout
- self._index = new_index
- delay = self._deadline - time.time()
- self._future = later.later(delay, self._expire(new_index))
- if new_timeout != timeout:
- self._transmission_manager.timeout(new_timeout)
-
- def deadline(self):
- return self._deadline
-
- def terminate(self):
- if self._future:
- self._future.cancel()
- self._future = None
- self._deadline_index = None
-
-
-def invocation_expiration_manager(
- timeout, lock, termination_manager, transmission_manager):
- """Creates an _interfaces.ExpirationManager appropriate for front-side use.
-
- Args:
- timeout: A length of time in seconds to allow for the operation to run.
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
-
- Returns:
- An _interfaces.ExpirationManager appropriate for invocation-side use.
- """
- expiration_manager = _ExpirationManager(
- time.time(), timeout, timeout, lock, termination_manager,
- transmission_manager)
- expiration_manager.start()
- return expiration_manager
-
-
-def service_expiration_manager(
- timeout, default_timeout, maximum_timeout, lock, termination_manager,
- transmission_manager):
- """Creates an _interfaces.ExpirationManager appropriate for back-side use.
-
- Args:
- timeout: A length of time in seconds to allow for the operation to run. May
- be None in which case default_timeout will be used.
- default_timeout: The default length of time in seconds to allow for the
- operation to run if the front-side customer has not specified such a value
- (or if the value they specified is not yet known).
- maximum_timeout: The maximum length of time in seconds to allow for the
- operation to run.
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
-
- Returns:
- An _interfaces.ExpirationManager appropriate for service-side use.
- """
- expiration_manager = _ExpirationManager(
- time.time(), default_timeout if timeout is None else timeout,
- maximum_timeout, lock, termination_manager, transmission_manager)
- expiration_manager.start()
- return expiration_manager
diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py
deleted file mode 100644
index f2767c981b..0000000000
--- a/src/python/grpcio/grpc/framework/core/_ingestion.py
+++ /dev/null
@@ -1,439 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for ingestion during an operation."""
-
-import abc
-import collections
-import enum
-
-import six
-
-from grpc.framework.core import _constants
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import abandonment
-from grpc.framework.foundation import callable_util
-from grpc.framework.interfaces.base import base
-
-_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
-_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
-
-
-class _SubscriptionCreation(
- collections.namedtuple(
- '_SubscriptionCreation',
- ('kind', 'subscription', 'code', 'details',))):
- """A sum type for the outcome of ingestion initialization.
-
- Attributes:
- kind: A Kind value coarsely indicating how subscription creation completed.
- subscription: The created subscription. Only present if kind is
- Kind.SUBSCRIPTION.
- code: A code value to be sent to the other side of the operation along with
- an indication that the operation is being aborted due to an error on the
- remote side of the operation. Only present if kind is Kind.REMOTE_ERROR.
- details: A details value to be sent to the other side of the operation
- along with an indication that the operation is being aborted due to an
- error on the remote side of the operation. Only present if kind is
- Kind.REMOTE_ERROR.
- """
-
- @enum.unique
- class Kind(enum.Enum):
- SUBSCRIPTION = 'subscription'
- REMOTE_ERROR = 'remote error'
- ABANDONED = 'abandoned'
-
-
-class _SubscriptionCreator(six.with_metaclass(abc.ABCMeta)):
- """Common specification of subscription-creating behavior."""
-
- @abc.abstractmethod
- def create(self, group, method):
- """Creates the base.Subscription of the local customer.
-
- Any exceptions raised by this method should be attributed to and treated as
- defects in the customer code called by this method.
-
- Args:
- group: The group identifier of the operation.
- method: The method identifier of the operation.
-
- Returns:
- A _SubscriptionCreation describing the result of subscription creation.
- """
- raise NotImplementedError()
-
-
-class _ServiceSubscriptionCreator(_SubscriptionCreator):
- """A _SubscriptionCreator appropriate for service-side use."""
-
- def __init__(self, servicer, operation_context, output_operator):
- """Constructor.
-
- Args:
- servicer: The base.Servicer that will service the operation.
- operation_context: A base.OperationContext for the operation to be passed
- to the customer.
- output_operator: A base.Operator for the operation to be passed to the
- customer and to be called by the customer to accept operation data
- emitted by the customer.
- """
- self._servicer = servicer
- self._operation_context = operation_context
- self._output_operator = output_operator
-
- def create(self, group, method):
- try:
- subscription = self._servicer.service(
- group, method, self._operation_context, self._output_operator)
- except base.NoSuchMethodError as e:
- return _SubscriptionCreation(
- _SubscriptionCreation.Kind.REMOTE_ERROR, None, e.code, e.details)
- except abandonment.Abandoned:
- return _SubscriptionCreation(
- _SubscriptionCreation.Kind.ABANDONED, None, None, None)
- else:
- return _SubscriptionCreation(
- _SubscriptionCreation.Kind.SUBSCRIPTION, subscription, None, None)
-
-
-def _wrap(behavior):
- def wrapped(*args, **kwargs):
- try:
- behavior(*args, **kwargs)
- except abandonment.Abandoned:
- return False
- else:
- return True
- return wrapped
-
-
-class _IngestionManager(_interfaces.IngestionManager):
- """An implementation of _interfaces.IngestionManager."""
-
- def __init__(
- self, lock, pool, subscription, subscription_creator, termination_manager,
- transmission_manager, expiration_manager, protocol_manager):
- """Constructor.
-
- Args:
- lock: The operation-wide lock.
- pool: A thread pool in which to execute customer code.
- subscription: A base.Subscription describing the customer's interest in
- operation values from the other side. May be None if
- subscription_creator is not None.
- subscription_creator: A _SubscriptionCreator wrapping the portion of
- customer code that when called returns the base.Subscription describing
- the customer's interest in operation values from the other side. May be
- None if subscription is not None.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- protocol_manager: The _interfaces.ProtocolManager for the operation.
- """
- self._lock = lock
- self._pool = pool
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
- self._protocol_manager = protocol_manager
-
- if subscription is None:
- self._subscription_creator = subscription_creator
- self._wrapped_operator = None
- elif subscription.kind is base.Subscription.Kind.FULL:
- self._subscription_creator = None
- self._wrapped_operator = _wrap(subscription.operator.advance)
- else:
- # TODO(nathaniel): Support other subscriptions.
- raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
- self._pending_initial_metadata = None
- self._pending_payloads = []
- self._pending_completion = None
- self._local_allowance = 1
- # A nonnegative integer or None, with None indicating that the local
- # customer is done emitting anyway so there's no need to bother it by
- # informing it that the remote customer has granted it further permission to
- # emit.
- self._remote_allowance = 0
- self._processing = False
-
- def _abort_internal_only(self):
- self._subscription_creator = None
- self._wrapped_operator = None
- self._pending_initial_metadata = None
- self._pending_payloads = None
- self._pending_completion = None
-
- def _abort_and_notify(self, outcome_kind, code, details):
- self._abort_internal_only()
- if self._termination_manager.outcome is None:
- outcome = _utilities.Outcome(outcome_kind, code, details)
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
- self._expiration_manager.terminate()
-
- def _operator_next(self):
- """Computes the next step for full-subscription ingestion.
-
- Returns:
- An initial_metadata, payload, completion, allowance, continue quintet
- indicating what operation values (if any) are available to pass into
- customer code and whether or not there is anything immediately
- actionable to call customer code to do.
- """
- if self._wrapped_operator is None:
- return None, None, None, None, False
- else:
- initial_metadata, payload, completion, allowance, action = [None] * 5
- if self._pending_initial_metadata is not None:
- initial_metadata = self._pending_initial_metadata
- self._pending_initial_metadata = None
- action = True
- if self._pending_payloads and 0 < self._local_allowance:
- payload = self._pending_payloads.pop(0)
- self._local_allowance -= 1
- action = True
- if not self._pending_payloads and self._pending_completion is not None:
- completion = self._pending_completion
- self._pending_completion = None
- action = True
- if self._remote_allowance is not None and 0 < self._remote_allowance:
- allowance = self._remote_allowance
- self._remote_allowance = 0
- action = True
- return initial_metadata, payload, completion, allowance, bool(action)
-
- def _operator_process(
- self, wrapped_operator, initial_metadata, payload,
- completion, allowance):
- while True:
- advance_outcome = callable_util.call_logging_exceptions(
- wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
- initial_metadata=initial_metadata, payload=payload,
- completion=completion, allowance=allowance)
- if advance_outcome.exception is None:
- if advance_outcome.return_value:
- with self._lock:
- if self._termination_manager.outcome is not None:
- return
- if completion is not None:
- self._termination_manager.ingestion_complete()
- initial_metadata, payload, completion, allowance, moar = (
- self._operator_next())
- if not moar:
- self._processing = False
- return
- else:
- with self._lock:
- if self._termination_manager.outcome is None:
- self._abort_and_notify(
- base.Outcome.Kind.LOCAL_FAILURE, None, None)
- return
- else:
- with self._lock:
- if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
- return
-
- def _operator_post_create(self, subscription):
- wrapped_operator = _wrap(subscription.operator.advance)
- with self._lock:
- if self._termination_manager.outcome is not None:
- return
- self._wrapped_operator = wrapped_operator
- self._subscription_creator = None
- metadata, payload, completion, allowance, moar = self._operator_next()
- if not moar:
- self._processing = False
- return
- self._operator_process(
- wrapped_operator, metadata, payload, completion, allowance)
-
- def _create(self, subscription_creator, group, name):
- outcome = callable_util.call_logging_exceptions(
- subscription_creator.create,
- _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, group, name)
- if outcome.return_value is None:
- with self._lock:
- if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
- elif outcome.return_value.kind is _SubscriptionCreation.Kind.ABANDONED:
- with self._lock:
- if self._termination_manager.outcome is None:
- self._abort_and_notify(base.Outcome.Kind.LOCAL_FAILURE, None, None)
- elif outcome.return_value.kind is _SubscriptionCreation.Kind.REMOTE_ERROR:
- code = outcome.return_value.code
- details = outcome.return_value.details
- with self._lock:
- if self._termination_manager.outcome is None:
- self._abort_and_notify(
- base.Outcome.Kind.REMOTE_FAILURE, code, details)
- elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
- self._protocol_manager.set_protocol_receiver(
- outcome.return_value.subscription.protocol_receiver)
- self._operator_post_create(outcome.return_value.subscription)
- else:
- # TODO(nathaniel): Support other subscriptions.
- raise ValueError(
- 'Unsupported "%s"!' % outcome.return_value.subscription.kind)
-
- def _store_advance(self, initial_metadata, payload, completion, allowance):
- if initial_metadata is not None:
- self._pending_initial_metadata = initial_metadata
- if payload is not None:
- self._pending_payloads.append(payload)
- if completion is not None:
- self._pending_completion = completion
- if allowance is not None and self._remote_allowance is not None:
- self._remote_allowance += allowance
-
- def _operator_advance(self, initial_metadata, payload, completion, allowance):
- if self._processing:
- self._store_advance(initial_metadata, payload, completion, allowance)
- else:
- action = False
- if initial_metadata is not None:
- action = True
- if payload is not None:
- if 0 < self._local_allowance:
- self._local_allowance -= 1
- action = True
- else:
- self._pending_payloads.append(payload)
- payload = False
- if completion is not None:
- if self._pending_payloads:
- self._pending_completion = completion
- else:
- action = True
- if allowance is not None and self._remote_allowance is not None:
- allowance += self._remote_allowance
- self._remote_allowance = 0
- action = True
- if action:
- self._pool.submit(
- callable_util.with_exceptions_logged(
- self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
- self._wrapped_operator, initial_metadata, payload, completion,
- allowance)
-
- def set_group_and_method(self, group, method):
- """See _interfaces.IngestionManager.set_group_and_method for spec."""
- if self._subscription_creator is not None and not self._processing:
- self._pool.submit(
- callable_util.with_exceptions_logged(
- self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
- self._subscription_creator, group, method)
- self._processing = True
-
- def add_local_allowance(self, allowance):
- """See _interfaces.IngestionManager.add_local_allowance for spec."""
- if any((self._subscription_creator, self._wrapped_operator,)):
- self._local_allowance += allowance
- if not self._processing:
- initial_metadata, payload, completion, allowance, moar = (
- self._operator_next())
- if moar:
- self._pool.submit(
- callable_util.with_exceptions_logged(
- self._operator_process,
- _constants.INTERNAL_ERROR_LOG_MESSAGE),
- initial_metadata, payload, completion, allowance)
-
- def local_emissions_done(self):
- self._remote_allowance = None
-
- def advance(self, initial_metadata, payload, completion, allowance):
- """See _interfaces.IngestionManager.advance for specification."""
- if self._subscription_creator is not None:
- self._store_advance(initial_metadata, payload, completion, allowance)
- elif self._wrapped_operator is not None:
- self._operator_advance(initial_metadata, payload, completion, allowance)
-
-
-def invocation_ingestion_manager(
- subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager, protocol_manager):
- """Creates an IngestionManager appropriate for invocation-side use.
-
- Args:
- subscription: A base.Subscription indicating the customer's interest in the
- data and results from the service-side of the operation.
- lock: The operation-wide lock.
- pool: A thread pool in which to execute customer code.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- protocol_manager: The _interfaces.ProtocolManager for the operation.
-
- Returns:
- An IngestionManager appropriate for invocation-side use.
- """
- return _IngestionManager(
- lock, pool, subscription, None, termination_manager, transmission_manager,
- expiration_manager, protocol_manager)
-
-
-def service_ingestion_manager(
- servicer, operation_context, output_operator, lock, pool,
- termination_manager, transmission_manager, expiration_manager,
- protocol_manager):
- """Creates an IngestionManager appropriate for service-side use.
-
- The returned IngestionManager will require its set_group_and_name method to be
- called before its advance method may be called.
-
- Args:
- servicer: A base.Servicer for servicing the operation.
- operation_context: A base.OperationContext for the operation to be passed to
- the customer.
- output_operator: A base.Operator for the operation to be passed to the
- customer and to be called by the customer to accept operation data output
- by the customer.
- lock: The operation-wide lock.
- pool: A thread pool in which to execute customer code.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- protocol_manager: The _interfaces.ProtocolManager for the operation.
-
- Returns:
- An IngestionManager appropriate for service-side use.
- """
- subscription_creator = _ServiceSubscriptionCreator(
- servicer, operation_context, output_operator)
- return _IngestionManager(
- lock, pool, None, subscription_creator, termination_manager,
- transmission_manager, expiration_manager, protocol_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py
deleted file mode 100644
index 63ac82f80e..0000000000
--- a/src/python/grpcio/grpc/framework/core/_interfaces.py
+++ /dev/null
@@ -1,331 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Package-internal interfaces."""
-
-import abc
-
-import six
-
-from grpc.framework.interfaces.base import base
-
-
-class TerminationManager(six.with_metaclass(abc.ABCMeta)):
- """An object responsible for handling the termination of an operation.
-
- Attributes:
- outcome: None if the operation is active or a base.Outcome value if it has
- terminated.
- """
-
- @abc.abstractmethod
- def add_callback(self, callback):
- """Registers a callback to be called on operation termination.
-
- If the operation has already terminated the callback will not be called.
-
- Args:
- callback: A callable that will be passed a base.Outcome value.
-
- Returns:
- None if the operation has not yet terminated and the passed callback will
- be called when it does, or a base.Outcome value describing the
- operation termination if the operation has terminated and the callback
- will not be called as a result of this method call.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def emission_complete(self):
- """Indicates that emissions from customer code have completed."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def transmission_complete(self):
- """Indicates that transmissions to the remote end are complete.
-
- Returns:
- True if the operation has terminated or False if the operation remains
- ongoing.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def reception_complete(self, code, details):
- """Indicates that reception from the other side is complete.
-
- Args:
- code: An application-specific code value.
- details: An application-specific details value.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def ingestion_complete(self):
- """Indicates that customer code ingestion of received values is complete."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def expire(self):
- """Indicates that the operation must abort because it has taken too long."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def abort(self, outcome):
- """Indicates that the operation must abort for the indicated reason.
-
- Args:
- outcome: A base.Outcome indicating operation abortion.
- """
- raise NotImplementedError()
-
-
-class TransmissionManager(six.with_metaclass(abc.ABCMeta)):
- """A manager responsible for transmitting to the other end of an operation."""
-
- @abc.abstractmethod
- def kick_off(
- self, group, method, timeout, protocol_options, initial_metadata,
- payload, completion, allowance):
- """Transmits the values associated with operation invocation."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def advance(self, initial_metadata, payload, completion, allowance):
- """Accepts values for transmission to the other end of the operation.
-
- Args:
- initial_metadata: An initial metadata value to be transmitted to the other
- side of the operation. May only ever be non-None once.
- payload: A payload value.
- completion: A base.Completion value. May only ever be non-None in the last
- transmission to be made to the other side.
- allowance: A positive integer communicating the number of additional
- payloads allowed to be transmitted from the other side to this side of
- the operation, or None if no additional allowance is being granted in
- this call.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def timeout(self, timeout):
- """Accepts for transmission to the other side a new timeout value.
-
- Args:
- timeout: A positive float used as the new timeout value for the operation
- to be transmitted to the other side.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def allowance(self, allowance):
- """Indicates to this manager that the remote customer is allowing payloads.
-
- Args:
- allowance: A positive integer indicating the number of additional payloads
- the remote customer is allowing to be transmitted from this side of the
- operation.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def remote_complete(self):
- """Indicates to this manager that data from the remote side is complete."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def abort(self, outcome):
- """Indicates that the operation has aborted.
-
- Args:
- outcome: A base.Outcome for the operation. If None, indicates that the
- operation abortion should not be communicated to the other side of the
- operation.
- """
- raise NotImplementedError()
-
-
-class ExpirationManager(six.with_metaclass(abc.ABCMeta)):
- """A manager responsible for aborting the operation if it runs out of time."""
-
- @abc.abstractmethod
- def change_timeout(self, timeout):
- """Changes the timeout allotted for the operation.
-
- Operation duration is always measure from the beginning of the operation;
- calling this method changes the operation's allotted time to timeout total
- seconds, not timeout seconds from the time of this method call.
-
- Args:
- timeout: A length of time in seconds to allow for the operation.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def deadline(self):
- """Returns the time until which the operation is allowed to run.
-
- Returns:
- The time (seconds since the epoch) at which the operation will expire.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def terminate(self):
- """Indicates to this manager that the operation has terminated."""
- raise NotImplementedError()
-
-
-class ProtocolManager(six.with_metaclass(abc.ABCMeta)):
- """A manager of protocol-specific values passing through an operation."""
-
- @abc.abstractmethod
- def set_protocol_receiver(self, protocol_receiver):
- """Registers the customer object that will receive protocol objects.
-
- Args:
- protocol_receiver: A base.ProtocolReceiver to which protocol objects for
- the operation should be passed.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def accept_protocol_context(self, protocol_context):
- """Accepts the protocol context object for the operation.
-
- Args:
- protocol_context: An object designated for use as the protocol context
- of the operation, with further semantics implementation-determined.
- """
- raise NotImplementedError()
-
-
-class EmissionManager(six.with_metaclass(abc.ABCMeta, base.Operator)):
- """A manager of values emitted by customer code."""
-
- @abc.abstractmethod
- def advance(
- self, initial_metadata=None, payload=None, completion=None,
- allowance=None):
- """Accepts a value emitted by customer code.
-
- This method should only be called by customer code.
-
- Args:
- initial_metadata: An initial metadata value emitted by the local customer
- to be sent to the other side of the operation.
- payload: A payload value emitted by the local customer to be sent to the
- other side of the operation.
- completion: A Completion value emitted by the local customer to be sent to
- the other side of the operation.
- allowance: A positive integer indicating an additional number of payloads
- that the local customer is willing to accept from the other side of the
- operation.
- """
- raise NotImplementedError()
-
-
-class IngestionManager(six.with_metaclass(abc.ABCMeta)):
- """A manager responsible for executing customer code.
-
- This name of this manager comes from its responsibility to pass successive
- values from the other side of the operation into the code of the local
- customer.
- """
-
- @abc.abstractmethod
- def set_group_and_method(self, group, method):
- """Communicates to this IngestionManager the operation group and method.
-
- Args:
- group: The group identifier of the operation.
- method: The method identifier of the operation.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def add_local_allowance(self, allowance):
- """Communicates to this IngestionManager that more payloads may be ingested.
-
- Args:
- allowance: A positive integer indicating an additional number of payloads
- that the local customer is willing to ingest.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def local_emissions_done(self):
- """Indicates to this manager that local emissions are done."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def advance(self, initial_metadata, payload, completion, allowance):
- """Advances the operation by passing values to the local customer."""
- raise NotImplementedError()
-
-
-class ReceptionManager(six.with_metaclass(abc.ABCMeta)):
- """A manager responsible for receiving tickets from the other end."""
-
- @abc.abstractmethod
- def receive_ticket(self, ticket):
- """Handle a ticket from the other side of the operation.
-
- Args:
- ticket: A links.Ticket for the operation.
- """
- raise NotImplementedError()
-
-
-class Operation(six.with_metaclass(abc.ABCMeta)):
- """An ongoing operation.
-
- Attributes:
- context: A base.OperationContext object for the operation.
- operator: A base.Operator object for the operation for use by the customer
- of the operation.
- """
-
- @abc.abstractmethod
- def handle_ticket(self, ticket):
- """Handle a ticket from the other side of the operation.
-
- Args:
- ticket: A links.Ticket from the other side of the operation.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def abort(self, outcome_kind):
- """Aborts the operation.
-
- Args:
- outcome_kind: A base.Outcome.Kind value indicating operation abortion.
- """
- raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py
deleted file mode 100644
index 020c0c9ed9..0000000000
--- a/src/python/grpcio/grpc/framework/core/_operation.py
+++ /dev/null
@@ -1,204 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Implementation of operations."""
-
-import threading
-
-from grpc.framework.core import _context
-from grpc.framework.core import _emission
-from grpc.framework.core import _expiration
-from grpc.framework.core import _ingestion
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _protocol
-from grpc.framework.core import _reception
-from grpc.framework.core import _termination
-from grpc.framework.core import _transmission
-from grpc.framework.core import _utilities
-
-
-class _EasyOperation(_interfaces.Operation):
- """A trivial implementation of interfaces.Operation."""
-
- def __init__(
- self, lock, termination_manager, transmission_manager, expiration_manager,
- context, operator, reception_manager):
- """Constructor.
-
- Args:
- lock: The operation-wide lock.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- context: A base.OperationContext for use by the customer during the
- operation.
- operator: A base.Operator for use by the customer during the operation.
- reception_manager: The _interfaces.ReceptionManager for the operation.
- """
- self._lock = lock
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
- self._reception_manager = reception_manager
-
- self.context = context
- self.operator = operator
-
- def handle_ticket(self, ticket):
- with self._lock:
- self._reception_manager.receive_ticket(ticket)
-
- def abort(self, outcome_kind):
- with self._lock:
- if self._termination_manager.outcome is None:
- outcome = _utilities.Outcome(outcome_kind, None, None)
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
- self._expiration_manager.terminate()
-
-
-def invocation_operate(
- operation_id, group, method, subscription, timeout, protocol_options,
- initial_metadata, payload, completion, ticket_sink, termination_action,
- pool):
- """Constructs objects necessary for front-side operation management.
-
- Args:
- operation_id: An object identifying the operation.
- group: The group identifier of the operation.
- method: The method identifier of the operation.
- subscription: A base.Subscription describing the customer's interest in the
- results of the operation.
- timeout: A length of time in seconds to allow for the operation.
- protocol_options: A transport-specific, application-specific, and/or
- protocol-specific value relating to the invocation. May be None.
- initial_metadata: An initial metadata value to be sent to the other side of
- the operation. May be None if the initial metadata will be passed later or
- if there will be no initial metadata passed at all.
- payload: The first payload value to be transmitted to the other side. May be
- None if there is no such value or if the customer chose not to pass it at
- operation invocation.
- completion: A base.Completion value indicating the end of values passed to
- the other side of the operation.
- ticket_sink: A callable that accepts links.Tickets and delivers them to the
- other side of the operation.
- termination_action: A callable that accepts the outcome of the operation as
- a base.Outcome value to be called on operation completion.
- pool: A thread pool with which to do the work of the operation.
-
- Returns:
- An _interfaces.Operation for the operation.
- """
- lock = threading.Lock()
- with lock:
- termination_manager = _termination.invocation_termination_manager(
- termination_action, pool)
- transmission_manager = _transmission.TransmissionManager(
- operation_id, ticket_sink, lock, pool, termination_manager)
- expiration_manager = _expiration.invocation_expiration_manager(
- timeout, lock, termination_manager, transmission_manager)
- protocol_manager = _protocol.invocation_protocol_manager(
- subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager)
- operation_context = _context.OperationContext(
- lock, termination_manager, transmission_manager, expiration_manager)
- emission_manager = _emission.EmissionManager(
- lock, termination_manager, transmission_manager, expiration_manager)
- ingestion_manager = _ingestion.invocation_ingestion_manager(
- subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager, protocol_manager)
- reception_manager = _reception.ReceptionManager(
- termination_manager, transmission_manager, expiration_manager,
- protocol_manager, ingestion_manager)
-
- termination_manager.set_expiration_manager(expiration_manager)
- transmission_manager.set_expiration_manager(expiration_manager)
- emission_manager.set_ingestion_manager(ingestion_manager)
-
- transmission_manager.kick_off(
- group, method, timeout, protocol_options, initial_metadata, payload,
- completion, None)
-
- return _EasyOperation(
- lock, termination_manager, transmission_manager, expiration_manager,
- operation_context, emission_manager, reception_manager)
-
-
-def service_operate(
- servicer_package, ticket, ticket_sink, termination_action, pool):
- """Constructs an Operation for service of an operation.
-
- Args:
- servicer_package: A _utilities.ServicerPackage to be used servicing the
- operation.
- ticket: The first links.Ticket received for the operation.
- ticket_sink: A callable that accepts links.Tickets and delivers them to the
- other side of the operation.
- termination_action: A callable that accepts the outcome of the operation as
- a base.Outcome value to be called on operation completion.
- pool: A thread pool with which to do the work of the operation.
-
- Returns:
- An _interfaces.Operation for the operation.
- """
- lock = threading.Lock()
- with lock:
- termination_manager = _termination.service_termination_manager(
- termination_action, pool)
- transmission_manager = _transmission.TransmissionManager(
- ticket.operation_id, ticket_sink, lock, pool, termination_manager)
- expiration_manager = _expiration.service_expiration_manager(
- ticket.timeout, servicer_package.default_timeout,
- servicer_package.maximum_timeout, lock, termination_manager,
- transmission_manager)
- protocol_manager = _protocol.service_protocol_manager(
- lock, pool, termination_manager, transmission_manager,
- expiration_manager)
- operation_context = _context.OperationContext(
- lock, termination_manager, transmission_manager, expiration_manager)
- emission_manager = _emission.EmissionManager(
- lock, termination_manager, transmission_manager, expiration_manager)
- ingestion_manager = _ingestion.service_ingestion_manager(
- servicer_package.servicer, operation_context, emission_manager, lock,
- pool, termination_manager, transmission_manager, expiration_manager,
- protocol_manager)
- reception_manager = _reception.ReceptionManager(
- termination_manager, transmission_manager, expiration_manager,
- protocol_manager, ingestion_manager)
-
- termination_manager.set_expiration_manager(expiration_manager)
- transmission_manager.set_expiration_manager(expiration_manager)
- emission_manager.set_ingestion_manager(ingestion_manager)
-
- reception_manager.receive_ticket(ticket)
-
- return _EasyOperation(
- lock, termination_manager, transmission_manager, expiration_manager,
- operation_context, emission_manager, reception_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_protocol.py b/src/python/grpcio/grpc/framework/core/_protocol.py
deleted file mode 100644
index 3177b5e302..0000000000
--- a/src/python/grpcio/grpc/framework/core/_protocol.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for passing protocol objects in an operation."""
-
-import collections
-import enum
-
-from grpc.framework.core import _constants
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import callable_util
-from grpc.framework.interfaces.base import base
-
-_EXCEPTION_LOG_MESSAGE = 'Exception delivering protocol object!'
-
-_LOCAL_FAILURE_OUTCOME = _utilities.Outcome(
- base.Outcome.Kind.LOCAL_FAILURE, None, None)
-
-
-class _Awaited(
- collections.namedtuple('_Awaited', ('kind', 'value',))):
-
- @enum.unique
- class Kind(enum.Enum):
- NOT_YET_ARRIVED = 'not yet arrived'
- ARRIVED = 'arrived'
-
-_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
-_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
-
-
-class _Transitory(
- collections.namedtuple('_Transitory', ('kind', 'value',))):
-
- @enum.unique
- class Kind(enum.Enum):
- NOT_YET_SEEN = 'not yet seen'
- PRESENT = 'present'
- GONE = 'gone'
-
-_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
-_GONE = _Transitory(_Transitory.Kind.GONE, None)
-
-
-class _ProtocolManager(_interfaces.ProtocolManager):
- """An implementation of _interfaces.ExpirationManager."""
-
- def __init__(
- self, protocol_receiver, lock, pool, termination_manager,
- transmission_manager, expiration_manager):
- """Constructor.
-
- Args:
- protocol_receiver: An _Awaited wrapping of the base.ProtocolReceiver to
- which protocol objects should be passed during the operation. May be
- of kind _Awaited.Kind.NOT_YET_ARRIVED if the customer's subscription is
- not yet known and may be of kind _Awaited.Kind.ARRIVED but with a value
- of None if the customer's subscription did not include a
- ProtocolReceiver.
- lock: The operation-wide lock.
- pool: A thread pool.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- """
- self._lock = lock
- self._pool = pool
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
-
- self._protocol_receiver = protocol_receiver
- self._context = _NOT_YET_SEEN
-
- def _abort_and_notify(self, outcome):
- if self._termination_manager.outcome is None:
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(outcome)
- self._expiration_manager.terminate()
-
- def _deliver(self, behavior, value):
- def deliver():
- delivery_outcome = callable_util.call_logging_exceptions(
- behavior, _EXCEPTION_LOG_MESSAGE, value)
- if delivery_outcome.kind is callable_util.Outcome.Kind.RAISED:
- with self._lock:
- self._abort_and_notify(_LOCAL_FAILURE_OUTCOME)
- self._pool.submit(
- callable_util.with_exceptions_logged(
- deliver, _constants.INTERNAL_ERROR_LOG_MESSAGE))
-
- def set_protocol_receiver(self, protocol_receiver):
- """See _interfaces.ProtocolManager.set_protocol_receiver for spec."""
- self._protocol_receiver = _Awaited(_Awaited.Kind.ARRIVED, protocol_receiver)
- if (self._context.kind is _Transitory.Kind.PRESENT and
- protocol_receiver is not None):
- self._deliver(protocol_receiver.context, self._context.value)
- self._context = _GONE
-
- def accept_protocol_context(self, protocol_context):
- """See _interfaces.ProtocolManager.accept_protocol_context for spec."""
- if self._protocol_receiver.kind is _Awaited.Kind.ARRIVED:
- if self._protocol_receiver.value is not None:
- self._deliver(self._protocol_receiver.value.context, protocol_context)
- self._context = _GONE
- else:
- self._context = _Transitory(_Transitory.Kind.PRESENT, protocol_context)
-
-
-def invocation_protocol_manager(
- subscription, lock, pool, termination_manager, transmission_manager,
- expiration_manager):
- """Creates an _interfaces.ProtocolManager for invocation-side use.
-
- Args:
- subscription: The local customer's subscription to the operation.
- lock: The operation-wide lock.
- pool: A thread pool.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- """
- if subscription.kind is base.Subscription.Kind.FULL:
- awaited_protocol_receiver = _Awaited(
- _Awaited.Kind.ARRIVED, subscription.protocol_receiver)
- else:
- awaited_protocol_receiver = _ARRIVED_AND_NONE
- return _ProtocolManager(
- awaited_protocol_receiver, lock, pool, termination_manager,
- transmission_manager, expiration_manager)
-
-
-def service_protocol_manager(
- lock, pool, termination_manager, transmission_manager, expiration_manager):
- """Creates an _interfaces.ProtocolManager for service-side use.
-
- Args:
- lock: The operation-wide lock.
- pool: A thread pool.
- termination_manager: The _interfaces.TerminationManager for the operation.
- transmission_manager: The _interfaces.TransmissionManager for the
- operation.
- expiration_manager: The _interfaces.ExpirationManager for the operation.
- """
- return _ProtocolManager(
- _NOT_YET_ARRIVED, lock, pool, termination_manager, transmission_manager,
- expiration_manager)
diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py
deleted file mode 100644
index ff81450dee..0000000000
--- a/src/python/grpcio/grpc/framework/core/_reception.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for ticket reception."""
-
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.base import utilities
-from grpc.framework.interfaces.links import links
-
-_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND = {
- links.Ticket.Termination.CANCELLATION: base.Outcome.Kind.CANCELLED,
- links.Ticket.Termination.EXPIRATION: base.Outcome.Kind.EXPIRED,
- links.Ticket.Termination.SHUTDOWN: base.Outcome.Kind.REMOTE_SHUTDOWN,
- links.Ticket.Termination.RECEPTION_FAILURE:
- base.Outcome.Kind.RECEPTION_FAILURE,
- links.Ticket.Termination.TRANSMISSION_FAILURE:
- base.Outcome.Kind.TRANSMISSION_FAILURE,
- links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.Kind.REMOTE_FAILURE,
- links.Ticket.Termination.REMOTE_FAILURE: base.Outcome.Kind.LOCAL_FAILURE,
-}
-
-_RECEPTION_FAILURE_OUTCOME = _utilities.Outcome(
- base.Outcome.Kind.RECEPTION_FAILURE, None, None)
-
-
-def _carrying_protocol_context(ticket):
- return ticket.protocol is not None and ticket.protocol.kind in (
- links.Protocol.Kind.INVOCATION_CONTEXT,
- links.Protocol.Kind.SERVICER_CONTEXT,)
-
-
-class ReceptionManager(_interfaces.ReceptionManager):
- """A ReceptionManager based around a _Receiver passed to it."""
-
- def __init__(
- self, termination_manager, transmission_manager, expiration_manager,
- protocol_manager, ingestion_manager):
- """Constructor.
-
- Args:
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
- protocol_manager: The operation's _interfaces.ProtocolManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- """
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._expiration_manager = expiration_manager
- self._protocol_manager = protocol_manager
- self._ingestion_manager = ingestion_manager
-
- self._lowest_unseen_sequence_number = 0
- self._out_of_sequence_tickets = {}
- self._aborted = False
-
- def _abort(self, outcome):
- self._aborted = True
- if self._termination_manager.outcome is None:
- self._termination_manager.abort(outcome)
- self._transmission_manager.abort(None)
- self._expiration_manager.terminate()
-
- def _sequence_failure(self, ticket):
- """Determines a just-arrived ticket's sequential legitimacy.
-
- Args:
- ticket: A just-arrived ticket.
-
- Returns:
- True if the ticket is sequentially legitimate; False otherwise.
- """
- if ticket.sequence_number < self._lowest_unseen_sequence_number:
- return True
- elif ticket.sequence_number in self._out_of_sequence_tickets:
- return True
- else:
- return False
-
- def _process_one(self, ticket):
- if ticket.sequence_number == 0:
- self._ingestion_manager.set_group_and_method(ticket.group, ticket.method)
- if _carrying_protocol_context(ticket):
- self._protocol_manager.accept_protocol_context(ticket.protocol.value)
- else:
- self._protocol_manager.accept_protocol_context(None)
- if ticket.timeout is not None:
- self._expiration_manager.change_timeout(ticket.timeout)
- if ticket.termination is None:
- completion = None
- else:
- completion = utilities.completion(
- ticket.terminal_metadata, ticket.code, ticket.message)
- self._termination_manager.reception_complete(ticket.code, ticket.message)
- self._ingestion_manager.advance(
- ticket.initial_metadata, ticket.payload, completion, ticket.allowance)
- if ticket.allowance is not None:
- self._transmission_manager.allowance(ticket.allowance)
-
- def _process(self, ticket):
- """Process those tickets ready to be processed.
-
- Args:
- ticket: A just-arrived ticket the sequence number of which matches this
- _ReceptionManager's _lowest_unseen_sequence_number field.
- """
- while True:
- self._process_one(ticket)
- next_ticket = self._out_of_sequence_tickets.pop(
- ticket.sequence_number + 1, None)
- if next_ticket is None:
- self._lowest_unseen_sequence_number = ticket.sequence_number + 1
- return
- else:
- ticket = next_ticket
-
- def receive_ticket(self, ticket):
- """See _interfaces.ReceptionManager.receive_ticket for specification."""
- if self._aborted:
- return
- elif self._sequence_failure(ticket):
- self._abort(_RECEPTION_FAILURE_OUTCOME)
- elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION):
- outcome_kind = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME_KIND[
- ticket.termination]
- self._abort(
- _utilities.Outcome(outcome_kind, ticket.code, ticket.message))
- elif ticket.sequence_number == self._lowest_unseen_sequence_number:
- self._process(ticket)
- else:
- self._out_of_sequence_tickets[ticket.sequence_number] = ticket
diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py
deleted file mode 100644
index fff3a3fc14..0000000000
--- a/src/python/grpcio/grpc/framework/core/_termination.py
+++ /dev/null
@@ -1,229 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for operation termination."""
-
-import abc
-
-import six
-
-from grpc.framework.core import _constants
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import callable_util
-from grpc.framework.interfaces.base import base
-
-
-def _invocation_completion_predicate(
- unused_emission_complete, unused_transmission_complete,
- unused_reception_complete, ingestion_complete):
- return ingestion_complete
-
-
-def _service_completion_predicate(
- unused_emission_complete, transmission_complete, unused_reception_complete,
- ingestion_complete):
- return transmission_complete and ingestion_complete
-
-
-class TerminationManager(six.with_metaclass(abc.ABCMeta, _interfaces.TerminationManager)):
- """A _interfaces.TransmissionManager on which another manager may be set."""
-
- @abc.abstractmethod
- def set_expiration_manager(self, expiration_manager):
- """Sets the expiration manager with which this manager will interact.
-
- Args:
- expiration_manager: The _interfaces.ExpirationManager associated with the
- current operation.
- """
- raise NotImplementedError()
-
-
-class _TerminationManager(TerminationManager):
- """An implementation of TerminationManager."""
-
- def __init__(self, predicate, action, pool):
- """Constructor.
-
- Args:
- predicate: One of _invocation_completion_predicate or
- _service_completion_predicate to be used to determine when the operation
- has completed.
- action: A behavior to pass the operation outcome's kind on operation
- termination.
- pool: A thread pool.
- """
- self._predicate = predicate
- self._action = action
- self._pool = pool
- self._expiration_manager = None
-
- self._callbacks = []
-
- self._code = None
- self._details = None
- self._emission_complete = False
- self._transmission_complete = False
- self._reception_complete = False
- self._ingestion_complete = False
-
- # The None-ness of outcome is the operation-wide record of whether and how
- # the operation has terminated.
- self.outcome = None
-
- def set_expiration_manager(self, expiration_manager):
- self._expiration_manager = expiration_manager
-
- def _terminate_internal_only(self, outcome):
- """Terminates the operation.
-
- Args:
- outcome: A base.Outcome describing the outcome of the operation.
- """
- self.outcome = outcome
- callbacks = list(self._callbacks)
- self._callbacks = None
-
- act = callable_util.with_exceptions_logged(
- self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE)
-
- # TODO(issue 3202): Don't call the local application's callbacks if it has
- # previously shown a programming defect.
- if False and outcome.kind is base.Outcome.Kind.LOCAL_FAILURE:
- self._pool.submit(act, base.Outcome.Kind.LOCAL_FAILURE)
- else:
- def call_callbacks_and_act(callbacks, outcome):
- for callback in callbacks:
- callback_outcome = callable_util.call_logging_exceptions(
- callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE,
- outcome)
- if callback_outcome.exception is not None:
- act_outcome_kind = base.Outcome.Kind.LOCAL_FAILURE
- break
- else:
- act_outcome_kind = outcome.kind
- act(act_outcome_kind)
-
- self._pool.submit(
- callable_util.with_exceptions_logged(
- call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE),
- callbacks, outcome)
-
- def _terminate_and_notify(self, outcome):
- self._terminate_internal_only(outcome)
- self._expiration_manager.terminate()
-
- def _perhaps_complete(self):
- if self._predicate(
- self._emission_complete, self._transmission_complete,
- self._reception_complete, self._ingestion_complete):
- self._terminate_and_notify(
- _utilities.Outcome(
- base.Outcome.Kind.COMPLETED, self._code, self._details))
- return True
- else:
- return False
-
- def is_active(self):
- """See _interfaces.TerminationManager.is_active for specification."""
- return self.outcome is None
-
- def add_callback(self, callback):
- """See _interfaces.TerminationManager.add_callback for specification."""
- if self.outcome is None:
- self._callbacks.append(callback)
- return None
- else:
- return self.outcome
-
- def emission_complete(self):
- """See superclass method for specification."""
- if self.outcome is None:
- self._emission_complete = True
- self._perhaps_complete()
-
- def transmission_complete(self):
- """See superclass method for specification."""
- if self.outcome is None:
- self._transmission_complete = True
- return self._perhaps_complete()
- else:
- return False
-
- def reception_complete(self, code, details):
- """See superclass method for specification."""
- if self.outcome is None:
- self._reception_complete = True
- self._code = code
- self._details = details
- self._perhaps_complete()
-
- def ingestion_complete(self):
- """See superclass method for specification."""
- if self.outcome is None:
- self._ingestion_complete = True
- self._perhaps_complete()
-
- def expire(self):
- """See _interfaces.TerminationManager.expire for specification."""
- self._terminate_internal_only(
- _utilities.Outcome(base.Outcome.Kind.EXPIRED, None, None))
-
- def abort(self, outcome):
- """See _interfaces.TerminationManager.abort for specification."""
- self._terminate_and_notify(outcome)
-
-
-def invocation_termination_manager(action, pool):
- """Creates a TerminationManager appropriate for invocation-side use.
-
- Args:
- action: An action to call on operation termination.
- pool: A thread pool in which to execute the passed action and any
- termination callbacks that are registered during the operation.
-
- Returns:
- A TerminationManager appropriate for invocation-side use.
- """
- return _TerminationManager(_invocation_completion_predicate, action, pool)
-
-
-def service_termination_manager(action, pool):
- """Creates a TerminationManager appropriate for service-side use.
-
- Args:
- action: An action to call on operation termination.
- pool: A thread pool in which to execute the passed action and any
- termination callbacks that are registered during the operation.
-
- Returns:
- A TerminationManager appropriate for service-side use.
- """
- return _TerminationManager(_service_completion_predicate, action, pool)
diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py
deleted file mode 100644
index 65b12c4160..0000000000
--- a/src/python/grpcio/grpc/framework/core/_transmission.py
+++ /dev/null
@@ -1,335 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for ticket transmission during an operation."""
-
-import collections
-import enum
-
-from grpc.framework.core import _constants
-from grpc.framework.core import _interfaces
-from grpc.framework.core import _utilities
-from grpc.framework.foundation import callable_util
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.links import links
-
-_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
-
-_TRANSMISSION_FAILURE_OUTCOME = _utilities.Outcome(
- base.Outcome.Kind.TRANSMISSION_FAILURE, None, None)
-
-
-def _explode_completion(completion):
- if completion is None:
- return None, None, None, None
- else:
- return (
- completion.terminal_metadata, completion.code, completion.message,
- links.Ticket.Termination.COMPLETION)
-
-
-class _Abort(
- collections.namedtuple(
- '_Abort', ('kind', 'termination', 'code', 'details',))):
- """Tracks whether the operation aborted and what is to be done about it.
-
- Attributes:
- kind: A Kind value describing the overall kind of the _Abort.
- termination: A links.Ticket.Termination value to be sent to the other side
- of the operation. Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
- code: A code value to be sent to the other side of the operation. Only
- valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
- details: A details value to be sent to the other side of the operation.
- Only valid if kind is Kind.ABORTED_NOTIFY_NEEDED.
- """
-
- @enum.unique
- class Kind(enum.Enum):
- NOT_ABORTED = 'not aborted'
- ABORTED_NOTIFY_NEEDED = 'aborted notify needed'
- ABORTED_NO_NOTIFY = 'aborted no notify'
-
-_NOT_ABORTED = _Abort(_Abort.Kind.NOT_ABORTED, None, None, None)
-_ABORTED_NO_NOTIFY = _Abort(_Abort.Kind.ABORTED_NO_NOTIFY, None, None, None)
-
-
-class TransmissionManager(_interfaces.TransmissionManager):
- """An _interfaces.TransmissionManager that sends links.Tickets."""
-
- def __init__(
- self, operation_id, ticket_sink, lock, pool, termination_manager):
- """Constructor.
-
- Args:
- operation_id: The operation's ID.
- ticket_sink: A callable that accepts tickets and sends them to the other
- side of the operation.
- lock: The operation-servicing-wide lock object.
- pool: A thread pool in which the work of transmitting tickets will be
- performed.
- termination_manager: The _interfaces.TerminationManager associated with
- this operation.
- """
- self._lock = lock
- self._pool = pool
- self._ticket_sink = ticket_sink
- self._operation_id = operation_id
- self._termination_manager = termination_manager
- self._expiration_manager = None
-
- self._lowest_unused_sequence_number = 0
- self._remote_allowance = 1
- self._remote_complete = False
- self._timeout = None
- self._local_allowance = 0
- self._initial_metadata = None
- self._payloads = []
- self._completion = None
- self._abort = _NOT_ABORTED
- self._transmitting = False
-
- def set_expiration_manager(self, expiration_manager):
- """Sets the ExpirationManager with which this manager will cooperate."""
- self._expiration_manager = expiration_manager
-
- def _next_ticket(self):
- """Creates the next ticket to be transmitted.
-
- Returns:
- A links.Ticket to be sent to the other side of the operation or None if
- there is nothing to be sent at this time.
- """
- if self._abort.kind is _Abort.Kind.ABORTED_NO_NOTIFY:
- return None
- elif self._abort.kind is _Abort.Kind.ABORTED_NOTIFY_NEEDED:
- termination = self._abort.termination
- code, details = self._abort.code, self._abort.details
- self._abort = _ABORTED_NO_NOTIFY
- return links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None, None,
- None, None, None, None, None, None, code, details, termination, None)
-
- action = False
- # TODO(nathaniel): Support other subscriptions.
- local_subscription = links.Ticket.Subscription.FULL
- timeout = self._timeout
- if timeout is not None:
- self._timeout = None
- action = True
- if self._local_allowance <= 0:
- allowance = None
- else:
- allowance = self._local_allowance
- self._local_allowance = 0
- action = True
- initial_metadata = self._initial_metadata
- if initial_metadata is not None:
- self._initial_metadata = None
- action = True
- if not self._payloads or self._remote_allowance <= 0:
- payload = None
- else:
- payload = self._payloads.pop(0)
- self._remote_allowance -= 1
- action = True
- if self._completion is None or self._payloads:
- terminal_metadata, code, message, termination = None, None, None, None
- else:
- terminal_metadata, code, message, termination = _explode_completion(
- self._completion)
- self._completion = None
- action = True
-
- if action:
- ticket = links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None, None,
- local_subscription, timeout, allowance, initial_metadata, payload,
- terminal_metadata, code, message, termination, None)
- self._lowest_unused_sequence_number += 1
- return ticket
- else:
- return None
-
- def _transmit(self, ticket):
- """Commences the transmission loop sending tickets.
-
- Args:
- ticket: A links.Ticket to be sent to the other side of the operation.
- """
- def transmit(ticket):
- while True:
- transmission_outcome = callable_util.call_logging_exceptions(
- self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
- if transmission_outcome.exception is None:
- with self._lock:
- if ticket.termination is links.Ticket.Termination.COMPLETION:
- self._termination_manager.transmission_complete()
- ticket = self._next_ticket()
- if ticket is None:
- self._transmitting = False
- return
- else:
- with self._lock:
- self._abort = _ABORTED_NO_NOTIFY
- if self._termination_manager.outcome is None:
- self._termination_manager.abort(_TRANSMISSION_FAILURE_OUTCOME)
- self._expiration_manager.terminate()
- return
-
- self._pool.submit(callable_util.with_exceptions_logged(
- transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
- self._transmitting = True
-
- def kick_off(
- self, group, method, timeout, protocol_options, initial_metadata,
- payload, completion, allowance):
- """See _interfaces.TransmissionManager.kickoff for specification."""
- # TODO(nathaniel): Support other subscriptions.
- subscription = links.Ticket.Subscription.FULL
- terminal_metadata, code, message, termination = _explode_completion(
- completion)
- self._remote_allowance = 1 if payload is None else 0
- protocol = links.Protocol(links.Protocol.Kind.CALL_OPTION, protocol_options)
- ticket = links.Ticket(
- self._operation_id, 0, group, method, subscription, timeout, allowance,
- initial_metadata, payload, terminal_metadata, code, message,
- termination, protocol)
- self._lowest_unused_sequence_number = 1
- self._transmit(ticket)
-
- def advance(self, initial_metadata, payload, completion, allowance):
- """See _interfaces.TransmissionManager.advance for specification."""
- if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
- return
-
- effective_initial_metadata = initial_metadata
- effective_payload = payload
- effective_completion = completion
- if allowance is not None and not self._remote_complete:
- effective_allowance = allowance
- else:
- effective_allowance = None
- if self._transmitting:
- if effective_initial_metadata is not None:
- self._initial_metadata = effective_initial_metadata
- if effective_payload is not None:
- self._payloads.append(effective_payload)
- if effective_completion is not None:
- self._completion = effective_completion
- if effective_allowance is not None:
- self._local_allowance += effective_allowance
- else:
- if effective_payload is not None:
- if 0 < self._remote_allowance:
- ticket_payload = effective_payload
- self._remote_allowance -= 1
- else:
- self._payloads.append(effective_payload)
- ticket_payload = None
- else:
- ticket_payload = None
- if effective_completion is not None and not self._payloads:
- ticket_completion = effective_completion
- else:
- self._completion = effective_completion
- ticket_completion = None
- if any(
- (effective_initial_metadata, ticket_payload, ticket_completion,
- effective_allowance)):
- terminal_metadata, code, message, termination = _explode_completion(
- completion)
- ticket = links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None, None,
- None, None, allowance, effective_initial_metadata, ticket_payload,
- terminal_metadata, code, message, termination, None)
- self._lowest_unused_sequence_number += 1
- self._transmit(ticket)
-
- def timeout(self, timeout):
- """See _interfaces.TransmissionManager.timeout for specification."""
- if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
- return
- elif self._transmitting:
- self._timeout = timeout
- else:
- ticket = links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None, None,
- None, timeout, None, None, None, None, None, None, None, None)
- self._lowest_unused_sequence_number += 1
- self._transmit(ticket)
-
- def allowance(self, allowance):
- """See _interfaces.TransmissionManager.allowance for specification."""
- if self._abort.kind is not _Abort.Kind.NOT_ABORTED:
- return
- elif self._transmitting or not self._payloads:
- self._remote_allowance += allowance
- else:
- self._remote_allowance += allowance - 1
- payload = self._payloads.pop(0)
- if self._payloads:
- completion = None
- else:
- completion = self._completion
- self._completion = None
- terminal_metadata, code, message, termination = _explode_completion(
- completion)
- ticket = links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None, None,
- None, None, None, None, payload, terminal_metadata, code, message,
- termination, None)
- self._lowest_unused_sequence_number += 1
- self._transmit(ticket)
-
- def remote_complete(self):
- """See _interfaces.TransmissionManager.remote_complete for specification."""
- self._remote_complete = True
- self._local_allowance = 0
-
- def abort(self, outcome):
- """See _interfaces.TransmissionManager.abort for specification."""
- if self._abort.kind is _Abort.Kind.NOT_ABORTED:
- if outcome is None:
- self._abort = _ABORTED_NO_NOTIFY
- else:
- termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION.get(
- outcome.kind)
- if termination is None:
- self._abort = _ABORTED_NO_NOTIFY
- elif self._transmitting:
- self._abort = _Abort(
- _Abort.Kind.ABORTED_NOTIFY_NEEDED, termination, outcome.code,
- outcome.details)
- else:
- ticket = links.Ticket(
- self._operation_id, self._lowest_unused_sequence_number, None,
- None, None, None, None, None, None, None, outcome.code,
- outcome.details, termination, None)
- self._transmit(ticket)
- self._abort = _ABORTED_NO_NOTIFY
diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py
deleted file mode 100644
index abedc727e4..0000000000
--- a/src/python/grpcio/grpc/framework/core/_utilities.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Package-internal utilities."""
-
-import collections
-
-from grpc.framework.interfaces.base import base
-
-
-class ServicerPackage(
- collections.namedtuple(
- 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))):
- """A trivial bundle class.
-
- Attributes:
- servicer: A base.Servicer.
- default_timeout: A float indicating the length of time in seconds to allow
- for an operation invoked without a timeout.
- maximum_timeout: A float indicating the maximum length of time in seconds to
- allow for an operation.
- """
-
-
-class Outcome(
- base.Outcome,
- collections.namedtuple('Outcome', ('kind', 'code', 'details',))):
- """A trivial implementation of base.Outcome."""
diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py
deleted file mode 100644
index 364a7faed4..0000000000
--- a/src/python/grpcio/grpc/framework/core/implementations.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Entry points into the ticket-exchange-based base layer implementation."""
-
-# base and links are referenced from specification in this module.
-from grpc.framework.core import _end
-from grpc.framework.interfaces.base import base # pylint: disable=unused-import
-from grpc.framework.interfaces.links import links # pylint: disable=unused-import
-
-
-def invocation_end_link():
- """Creates a base.End-links.Link suitable for operation invocation.
-
- Returns:
- An object that is both a base.End and a links.Link, that supports operation
- invocation, and that translates operation invocation into ticket exchange.
- """
- return _end.serviceless_end_link()
-
-
-def service_end_link(servicer, default_timeout, maximum_timeout):
- """Creates a base.End-links.Link suitable for operation service.
-
- Args:
- servicer: A base.Servicer for servicing operations.
- default_timeout: A length of time in seconds to be used as the default
- time alloted for a single operation.
- maximum_timeout: A length of time in seconds to be used as the maximum
- time alloted for a single operation.
-
- Returns:
- An object that is both a base.End and a links.Link and that services
- operations that arrive at it through ticket exchange.
- """
- return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout)
diff --git a/src/python/grpcio/grpc/framework/crust/__init__.py b/src/python/grpcio/grpc/framework/crust/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/grpc/framework/crust/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/grpc/framework/crust/_calls.py b/src/python/grpcio/grpc/framework/crust/_calls.py
deleted file mode 100644
index bff940d747..0000000000
--- a/src/python/grpcio/grpc/framework/crust/_calls.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Utility functions for invoking RPCs."""
-
-from grpc.framework.crust import _control
-from grpc.framework.interfaces.base import utilities
-from grpc.framework.interfaces.face import face
-
-_ITERATOR_EXCEPTION_LOG_MESSAGE = 'Exception iterating over requests!'
-
-_EMPTY_COMPLETION = utilities.completion(None, None, None)
-
-
-def _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- complete):
- rendezvous = _control.Rendezvous(None, None)
- subscription = utilities.full_subscription(
- rendezvous, _control.protocol_receiver(rendezvous))
- operation_context, operator = end.operate(
- group, method, subscription, timeout, protocol_options=protocol_options,
- initial_metadata=initial_metadata, payload=payload,
- completion=_EMPTY_COMPLETION if complete else None)
- rendezvous.set_operator_and_context(operator, operation_context)
- outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
- if outcome is not None:
- rendezvous.set_outcome(outcome)
- return rendezvous, operation_context, outcome
-
-
-def _event_return_unary(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
- if outcome is None:
- def in_pool():
- abortion = rendezvous.add_abortion_callback(abortion_callback)
- if abortion is None:
- try:
- receiver.initial_metadata(rendezvous.initial_metadata())
- receiver.response(next(rendezvous))
- receiver.complete(
- rendezvous.terminal_metadata(), rendezvous.code(),
- rendezvous.details())
- except face.AbortionError:
- pass
- else:
- abortion_callback(abortion)
- pool.submit(_control.pool_wrap(in_pool, operation_context))
- return rendezvous
-
-
-def _event_return_stream(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool):
- if outcome is None:
- def in_pool():
- abortion = rendezvous.add_abortion_callback(abortion_callback)
- if abortion is None:
- try:
- receiver.initial_metadata(rendezvous.initial_metadata())
- for response in rendezvous:
- receiver.response(response)
- receiver.complete(
- rendezvous.terminal_metadata(), rendezvous.code(),
- rendezvous.details())
- except face.AbortionError:
- pass
- else:
- abortion_callback(abortion)
- pool.submit(_control.pool_wrap(in_pool, operation_context))
- return rendezvous
-
-
-def blocking_unary_unary(
- end, group, method, timeout, with_call, protocol_options, initial_metadata,
- payload):
- """Services in a blocking fashion a unary-unary servicer method."""
- rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- True)
- if with_call:
- return next(rendezvous), rendezvous
- else:
- return next(rendezvous)
-
-
-def future_unary_unary(
- end, group, method, timeout, protocol_options, initial_metadata, payload):
- """Services a value-in value-out servicer method by returning a Future."""
- rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- True)
- return rendezvous
-
-
-def inline_unary_stream(
- end, group, method, timeout, protocol_options, initial_metadata, payload):
- """Services a value-in stream-out servicer method."""
- rendezvous, unused_operation_context, unused_outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- True)
- return rendezvous
-
-
-def blocking_stream_unary(
- end, group, method, timeout, with_call, protocol_options, initial_metadata,
- payload_iterator, pool):
- """Services in a blocking fashion a stream-in value-out servicer method."""
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, None,
- False)
- if outcome is None:
- def in_pool():
- for payload in payload_iterator:
- rendezvous.consume(payload)
- rendezvous.terminate()
- pool.submit(_control.pool_wrap(in_pool, operation_context))
- if with_call:
- return next(rendezvous), rendezvous
- else:
- return next(rendezvous)
- else:
- if with_call:
- return next(rendezvous), rendezvous
- else:
- return next(rendezvous)
-
-
-def future_stream_unary(
- end, group, method, timeout, protocol_options, initial_metadata,
- payload_iterator, pool):
- """Services a stream-in value-out servicer method by returning a Future."""
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, None,
- False)
- if outcome is None:
- def in_pool():
- for payload in payload_iterator:
- rendezvous.consume(payload)
- rendezvous.terminate()
- pool.submit(_control.pool_wrap(in_pool, operation_context))
- return rendezvous
-
-
-def inline_stream_stream(
- end, group, method, timeout, protocol_options, initial_metadata,
- payload_iterator, pool):
- """Services a stream-in stream-out servicer method."""
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, None,
- False)
- if outcome is None:
- def in_pool():
- for payload in payload_iterator:
- rendezvous.consume(payload)
- rendezvous.terminate()
- pool.submit(_control.pool_wrap(in_pool, operation_context))
- return rendezvous
-
-
-def event_unary_unary(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- receiver, abortion_callback, pool):
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- True)
- return _event_return_unary(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
-
-
-def event_unary_stream(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- receiver, abortion_callback, pool):
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, payload,
- True)
- return _event_return_stream(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
-
-
-def event_stream_unary(
- end, group, method, timeout, protocol_options, initial_metadata, receiver,
- abortion_callback, pool):
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, None,
- False)
- return _event_return_unary(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
-
-
-def event_stream_stream(
- end, group, method, timeout, protocol_options, initial_metadata, receiver,
- abortion_callback, pool):
- rendezvous, operation_context, outcome = _invoke(
- end, group, method, timeout, protocol_options, initial_metadata, None,
- False)
- return _event_return_stream(
- receiver, abortion_callback, rendezvous, operation_context, outcome, pool)
diff --git a/src/python/grpcio/grpc/framework/crust/_control.py b/src/python/grpcio/grpc/framework/crust/_control.py
deleted file mode 100644
index 9b4167bda0..0000000000
--- a/src/python/grpcio/grpc/framework/crust/_control.py
+++ /dev/null
@@ -1,584 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for translating between sync and async control flow."""
-
-import collections
-import enum
-import sys
-import threading
-import time
-
-from grpc.framework.foundation import abandonment
-from grpc.framework.foundation import callable_util
-from grpc.framework.foundation import future
-from grpc.framework.foundation import stream
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.base import utilities
-from grpc.framework.interfaces.face import face
-
-_DONE_CALLBACK_LOG_MESSAGE = 'Exception calling Future "done" callback!'
-_INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Crust) Internal Error! )-:'
-
-_CANNOT_SET_INITIAL_METADATA = (
- 'Could not set initial metadata - has it already been set, or has a ' +
- 'payload already been sent?')
-_CANNOT_SET_TERMINAL_METADATA = (
- 'Could not set terminal metadata - has it already been set, or has RPC ' +
- 'completion already been indicated?')
-_CANNOT_SET_CODE = (
- 'Could not set code - has it already been set, or has RPC completion ' +
- 'already been indicated?')
-_CANNOT_SET_DETAILS = (
- 'Could not set details - has it already been set, or has RPC completion ' +
- 'already been indicated?')
-
-
-class _DummyOperator(base.Operator):
-
- def advance(
- self, initial_metadata=None, payload=None, completion=None,
- allowance=None):
- pass
-
-_DUMMY_OPERATOR = _DummyOperator()
-
-
-class _Awaited(
- collections.namedtuple('_Awaited', ('kind', 'value',))):
-
- @enum.unique
- class Kind(enum.Enum):
- NOT_YET_ARRIVED = 'not yet arrived'
- ARRIVED = 'arrived'
-
-_NOT_YET_ARRIVED = _Awaited(_Awaited.Kind.NOT_YET_ARRIVED, None)
-_ARRIVED_AND_NONE = _Awaited(_Awaited.Kind.ARRIVED, None)
-
-
-class _Transitory(
- collections.namedtuple('_Transitory', ('kind', 'value',))):
-
- @enum.unique
- class Kind(enum.Enum):
- NOT_YET_SEEN = 'not yet seen'
- PRESENT = 'present'
- GONE = 'gone'
-
-_NOT_YET_SEEN = _Transitory(_Transitory.Kind.NOT_YET_SEEN, None)
-_GONE = _Transitory(_Transitory.Kind.GONE, None)
-
-
-class _Termination(
- collections.namedtuple(
- '_Termination', ('terminated', 'abortion', 'abortion_error',))):
- """Values indicating whether and how an RPC has terminated.
-
- Attributes:
- terminated: A boolean indicating whether or not the RPC has terminated.
- abortion: A face.Abortion value describing the RPC's abortion or None if the
- RPC did not abort.
- abortion_error: A face.AbortionError describing the RPC's abortion or None
- if the RPC did not abort.
- """
-
-_NOT_TERMINATED = _Termination(False, None, None)
-
-_OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR = {
- base.Outcome.Kind.COMPLETED: lambda *unused_args: _Termination(
- True, None, None),
- base.Outcome.Kind.CANCELLED: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.CANCELLED, *args),
- face.CancellationError(*args)),
- base.Outcome.Kind.EXPIRED: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.EXPIRED, *args),
- face.ExpirationError(*args)),
- base.Outcome.Kind.LOCAL_SHUTDOWN: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.LOCAL_SHUTDOWN, *args),
- face.LocalShutdownError(*args)),
- base.Outcome.Kind.REMOTE_SHUTDOWN: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.REMOTE_SHUTDOWN, *args),
- face.RemoteShutdownError(*args)),
- base.Outcome.Kind.RECEPTION_FAILURE: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
- face.NetworkError(*args)),
- base.Outcome.Kind.TRANSMISSION_FAILURE: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.NETWORK_FAILURE, *args),
- face.NetworkError(*args)),
- base.Outcome.Kind.LOCAL_FAILURE: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.LOCAL_FAILURE, *args),
- face.LocalError(*args)),
- base.Outcome.Kind.REMOTE_FAILURE: lambda *args: _Termination(
- True, face.Abortion(face.Abortion.Kind.REMOTE_FAILURE, *args),
- face.RemoteError(*args)),
-}
-
-
-def _wait_once_until(condition, until):
- if until is None:
- condition.wait()
- else:
- remaining = until - time.time()
- if remaining < 0:
- raise future.TimeoutError()
- else:
- condition.wait(timeout=remaining)
-
-
-def _done_callback_as_operation_termination_callback(
- done_callback, rendezvous):
- def operation_termination_callback(operation_outcome):
- rendezvous.set_outcome(operation_outcome)
- done_callback(rendezvous)
- return operation_termination_callback
-
-
-def _abortion_callback_as_operation_termination_callback(
- rpc_abortion_callback, rendezvous_set_outcome):
- def operation_termination_callback(operation_outcome):
- termination = rendezvous_set_outcome(operation_outcome)
- if termination.abortion is not None:
- rpc_abortion_callback(termination.abortion)
- return operation_termination_callback
-
-
-class Rendezvous(base.Operator, future.Future, stream.Consumer, face.Call):
- """A rendez-vous for the threads of an operation.
-
- Instances of this object present iterator and stream.Consumer interfaces for
- interacting with application code and present a base.Operator interface and
- maintain a base.Operator internally for interacting with base interface code.
- """
-
- def __init__(self, operator, operation_context):
- self._condition = threading.Condition()
-
- self._operator = operator
- self._operation_context = operation_context
-
- self._protocol_context = _NOT_YET_ARRIVED
-
- self._up_initial_metadata = _NOT_YET_ARRIVED
- self._up_payload = None
- self._up_allowance = 1
- self._up_completion = _NOT_YET_ARRIVED
- self._down_initial_metadata = _NOT_YET_SEEN
- self._down_payload = None
- self._down_allowance = 1
- self._down_terminal_metadata = _NOT_YET_SEEN
- self._down_code = _NOT_YET_SEEN
- self._down_details = _NOT_YET_SEEN
-
- self._termination = _NOT_TERMINATED
-
- # The semantics of future.Future.cancel and future.Future.cancelled are
- # slightly wonky, so they have to be tracked separately from the rest of the
- # result of the RPC. This field tracks whether cancellation was requested
- # prior to termination of the RPC
- self._cancelled = False
-
- def set_operator_and_context(self, operator, operation_context):
- with self._condition:
- self._operator = operator
- self._operation_context = operation_context
-
- def _down_completion(self):
- if self._down_terminal_metadata.kind is _Transitory.Kind.NOT_YET_SEEN:
- terminal_metadata = None
- self._down_terminal_metadata = _GONE
- elif self._down_terminal_metadata.kind is _Transitory.Kind.PRESENT:
- terminal_metadata = self._down_terminal_metadata.value
- self._down_terminal_metadata = _GONE
- else:
- terminal_metadata = None
- if self._down_code.kind is _Transitory.Kind.NOT_YET_SEEN:
- code = None
- self._down_code = _GONE
- elif self._down_code.kind is _Transitory.Kind.PRESENT:
- code = self._down_code.value
- self._down_code = _GONE
- else:
- code = None
- if self._down_details.kind is _Transitory.Kind.NOT_YET_SEEN:
- details = None
- self._down_details = _GONE
- elif self._down_details.kind is _Transitory.Kind.PRESENT:
- details = self._down_details.value
- self._down_details = _GONE
- else:
- details = None
- return utilities.completion(terminal_metadata, code, details)
-
- def _set_outcome(self, outcome):
- if not self._termination.terminated:
- self._operator = _DUMMY_OPERATOR
- self._operation_context = None
- self._down_initial_metadata = _GONE
- self._down_payload = None
- self._down_terminal_metadata = _GONE
- self._down_code = _GONE
- self._down_details = _GONE
-
- if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- initial_metadata = None
- else:
- initial_metadata = self._up_initial_metadata.value
- if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- terminal_metadata = None
- else:
- terminal_metadata = self._up_completion.value.terminal_metadata
- if outcome.kind is base.Outcome.Kind.COMPLETED:
- code = self._up_completion.value.code
- details = self._up_completion.value.message
- else:
- code = outcome.code
- details = outcome.details
- self._termination = _OPERATION_OUTCOME_KIND_TO_TERMINATION_CONSTRUCTOR[
- outcome.kind](initial_metadata, terminal_metadata, code, details)
-
- self._condition.notify_all()
-
- return self._termination
-
- def advance(
- self, initial_metadata=None, payload=None, completion=None,
- allowance=None):
- with self._condition:
- if initial_metadata is not None:
- self._up_initial_metadata = _Awaited(
- _Awaited.Kind.ARRIVED, initial_metadata)
- if payload is not None:
- if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- self._up_initial_metadata = _ARRIVED_AND_NONE
- self._up_payload = payload
- self._up_allowance -= 1
- if completion is not None:
- if self._up_initial_metadata.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- self._up_initial_metadata = _ARRIVED_AND_NONE
- self._up_completion = _Awaited(
- _Awaited.Kind.ARRIVED, completion)
- if allowance is not None:
- if self._down_payload is not None:
- self._operator.advance(payload=self._down_payload)
- self._down_payload = None
- self._down_allowance += allowance - 1
- else:
- self._down_allowance += allowance
- self._condition.notify_all()
-
- def cancel(self):
- with self._condition:
- if self._operation_context is not None:
- self._operation_context.cancel()
- self._cancelled = True
- return False
-
- def cancelled(self):
- with self._condition:
- return self._cancelled
-
- def running(self):
- with self._condition:
- return not self._termination.terminated
-
- def done(self):
- with self._condition:
- return self._termination.terminated
-
- def result(self, timeout=None):
- until = None if timeout is None else time.time() + timeout
- with self._condition:
- while True:
- if self._termination.terminated:
- if self._termination.abortion is None:
- return self._up_payload
- elif self._termination.abortion.kind is face.Abortion.Kind.CANCELLED:
- raise future.CancelledError()
- else:
- raise self._termination.abortion_error # pylint: disable=raising-bad-type
- else:
- _wait_once_until(self._condition, until)
-
- def exception(self, timeout=None):
- until = None if timeout is None else time.time() + timeout
- with self._condition:
- while True:
- if self._termination.terminated:
- if self._termination.abortion is None:
- return None
- else:
- return self._termination.abortion_error
- else:
- _wait_once_until(self._condition, until)
-
- def traceback(self, timeout=None):
- until = None if timeout is None else time.time() + timeout
- with self._condition:
- while True:
- if self._termination.terminated:
- if self._termination.abortion_error is None:
- return None
- else:
- abortion_error = self._termination.abortion_error
- break
- else:
- _wait_once_until(self._condition, until)
-
- try:
- raise abortion_error
- except face.AbortionError:
- return sys.exc_info()[2]
-
- def add_done_callback(self, fn):
- with self._condition:
- if self._operation_context is not None:
- outcome = self._operation_context.add_termination_callback(
- _done_callback_as_operation_termination_callback(fn, self))
- if outcome is None:
- return
- else:
- self._set_outcome(outcome)
-
- fn(self)
-
- def consume(self, value):
- with self._condition:
- while True:
- if self._termination.terminated:
- return
- elif 0 < self._down_allowance:
- self._operator.advance(payload=value)
- self._down_allowance -= 1
- return
- else:
- self._condition.wait()
-
- def terminate(self):
- with self._condition:
- if self._termination.terminated:
- return
- elif self._down_code.kind is _Transitory.Kind.GONE:
- # Conform to specified idempotence of terminate by ignoring extra calls.
- return
- else:
- completion = self._down_completion()
- self._operator.advance(completion=completion)
-
- def consume_and_terminate(self, value):
- with self._condition:
- while True:
- if self._termination.terminated:
- return
- elif 0 < self._down_allowance:
- completion = self._down_completion()
- self._operator.advance(payload=value, completion=completion)
- return
- else:
- self._condition.wait()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def next(self):
- with self._condition:
- while True:
- if self._termination.abortion_error is not None:
- raise self._termination.abortion_error
- elif self._up_payload is not None:
- payload = self._up_payload
- self._up_payload = None
- if self._up_completion.kind is _Awaited.Kind.NOT_YET_ARRIVED:
- self._operator.advance(allowance=1)
- return payload
- elif self._up_completion.kind is _Awaited.Kind.ARRIVED:
- raise StopIteration()
- else:
- self._condition.wait()
-
- def is_active(self):
- with self._condition:
- return not self._termination.terminated
-
- def time_remaining(self):
- if self._operation_context is None:
- return 0
- else:
- return self._operation_context.time_remaining()
-
- def add_abortion_callback(self, abortion_callback):
- with self._condition:
- if self._operation_context is None:
- return self._termination.abortion
- else:
- outcome = self._operation_context.add_termination_callback(
- _abortion_callback_as_operation_termination_callback(
- abortion_callback, self.set_outcome))
- if outcome is not None:
- return self._set_outcome(outcome).abortion
- else:
- return self._termination.abortion
-
- def protocol_context(self):
- with self._condition:
- while True:
- if self._protocol_context.kind is _Awaited.Kind.ARRIVED:
- return self._protocol_context.value
- elif self._termination.abortion_error is not None:
- raise self._termination.abortion_error
- else:
- self._condition.wait()
-
- def initial_metadata(self):
- with self._condition:
- while True:
- if self._up_initial_metadata.kind is _Awaited.Kind.ARRIVED:
- return self._up_initial_metadata.value
- elif self._termination.terminated:
- return None
- else:
- self._condition.wait()
-
- def terminal_metadata(self):
- with self._condition:
- while True:
- if self._up_completion.kind is _Awaited.Kind.ARRIVED:
- return self._up_completion.value.terminal_metadata
- elif self._termination.terminated:
- return None
- else:
- self._condition.wait()
-
- def code(self):
- with self._condition:
- while True:
- if self._up_completion.kind is _Awaited.Kind.ARRIVED:
- return self._up_completion.value.code
- elif self._termination.terminated:
- return None
- else:
- self._condition.wait()
-
- def details(self):
- with self._condition:
- while True:
- if self._up_completion.kind is _Awaited.Kind.ARRIVED:
- return self._up_completion.value.message
- elif self._termination.terminated:
- return None
- else:
- self._condition.wait()
-
- def set_initial_metadata(self, initial_metadata):
- with self._condition:
- if (self._down_initial_metadata.kind is not
- _Transitory.Kind.NOT_YET_SEEN):
- raise ValueError(_CANNOT_SET_INITIAL_METADATA)
- else:
- self._down_initial_metadata = _GONE
- self._operator.advance(initial_metadata=initial_metadata)
-
- def set_terminal_metadata(self, terminal_metadata):
- with self._condition:
- if (self._down_terminal_metadata.kind is not
- _Transitory.Kind.NOT_YET_SEEN):
- raise ValueError(_CANNOT_SET_TERMINAL_METADATA)
- else:
- self._down_terminal_metadata = _Transitory(
- _Transitory.Kind.PRESENT, terminal_metadata)
-
- def set_code(self, code):
- with self._condition:
- if self._down_code.kind is not _Transitory.Kind.NOT_YET_SEEN:
- raise ValueError(_CANNOT_SET_CODE)
- else:
- self._down_code = _Transitory(_Transitory.Kind.PRESENT, code)
-
- def set_details(self, details):
- with self._condition:
- if self._down_details.kind is not _Transitory.Kind.NOT_YET_SEEN:
- raise ValueError(_CANNOT_SET_DETAILS)
- else:
- self._down_details = _Transitory(_Transitory.Kind.PRESENT, details)
-
- def set_protocol_context(self, protocol_context):
- with self._condition:
- self._protocol_context = _Awaited(
- _Awaited.Kind.ARRIVED, protocol_context)
- self._condition.notify_all()
-
- def set_outcome(self, outcome):
- with self._condition:
- return self._set_outcome(outcome)
-
-
-class _ProtocolReceiver(base.ProtocolReceiver):
-
- def __init__(self, rendezvous):
- self._rendezvous = rendezvous
-
- def context(self, protocol_context):
- self._rendezvous.set_protocol_context(protocol_context)
-
-
-def protocol_receiver(rendezvous):
- return _ProtocolReceiver(rendezvous)
-
-
-def pool_wrap(behavior, operation_context):
- """Wraps an operation-related behavior so that it may be called in a pool.
-
- Args:
- behavior: A callable related to carrying out an operation.
- operation_context: A base_interfaces.OperationContext for the operation.
-
- Returns:
- A callable that when called carries out the behavior of the given callable
- and handles whatever exceptions it raises appropriately.
- """
- def translation(*args):
- try:
- behavior(*args)
- except (
- abandonment.Abandoned,
- face.CancellationError,
- face.ExpirationError,
- face.LocalShutdownError,
- face.RemoteShutdownError,
- face.NetworkError,
- face.RemoteError,
- ) as e:
- if operation_context.outcome() is None:
- operation_context.fail(e)
- except Exception as e:
- operation_context.fail(e)
- return callable_util.with_exceptions_logged(
- translation, _INTERNAL_ERROR_LOG_MESSAGE)
diff --git a/src/python/grpcio/grpc/framework/crust/_service.py b/src/python/grpcio/grpc/framework/crust/_service.py
deleted file mode 100644
index 9903415c09..0000000000
--- a/src/python/grpcio/grpc/framework/crust/_service.py
+++ /dev/null
@@ -1,173 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Behaviors for servicing RPCs."""
-
-from grpc.framework.crust import _control
-from grpc.framework.foundation import abandonment
-from grpc.framework.interfaces.base import utilities
-from grpc.framework.interfaces.face import face
-
-
-class _ServicerContext(face.ServicerContext):
-
- def __init__(self, rendezvous):
- self._rendezvous = rendezvous
-
- def is_active(self):
- return self._rendezvous.is_active()
-
- def time_remaining(self):
- return self._rendezvous.time_remaining()
-
- def add_abortion_callback(self, abortion_callback):
- return self._rendezvous.add_abortion_callback(abortion_callback)
-
- def cancel(self):
- self._rendezvous.cancel()
-
- def protocol_context(self):
- return self._rendezvous.protocol_context()
-
- def invocation_metadata(self):
- return self._rendezvous.initial_metadata()
-
- def initial_metadata(self, initial_metadata):
- self._rendezvous.set_initial_metadata(initial_metadata)
-
- def terminal_metadata(self, terminal_metadata):
- self._rendezvous.set_terminal_metadata(terminal_metadata)
-
- def code(self, code):
- self._rendezvous.set_code(code)
-
- def details(self, details):
- self._rendezvous.set_details(details)
-
-
-def _adaptation(pool, in_pool):
- def adaptation(operator, operation_context):
- rendezvous = _control.Rendezvous(operator, operation_context)
- subscription = utilities.full_subscription(
- rendezvous, _control.protocol_receiver(rendezvous))
- outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
- if outcome is None:
- pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
- return subscription
- else:
- raise abandonment.Abandoned()
- return adaptation
-
-
-def adapt_inline_unary_unary(method, pool):
- def in_pool(rendezvous):
- request = next(rendezvous)
- response = method(request, _ServicerContext(rendezvous))
- rendezvous.consume_and_terminate(response)
- return _adaptation(pool, in_pool)
-
-
-def adapt_inline_unary_stream(method, pool):
- def in_pool(rendezvous):
- request = next(rendezvous)
- response_iterator = method(request, _ServicerContext(rendezvous))
- for response in response_iterator:
- rendezvous.consume(response)
- rendezvous.terminate()
- return _adaptation(pool, in_pool)
-
-
-def adapt_inline_stream_unary(method, pool):
- def in_pool(rendezvous):
- response = method(rendezvous, _ServicerContext(rendezvous))
- rendezvous.consume_and_terminate(response)
- return _adaptation(pool, in_pool)
-
-
-def adapt_inline_stream_stream(method, pool):
- def in_pool(rendezvous):
- response_iterator = method(rendezvous, _ServicerContext(rendezvous))
- for response in response_iterator:
- rendezvous.consume(response)
- rendezvous.terminate()
- return _adaptation(pool, in_pool)
-
-
-def adapt_event_unary_unary(method, pool):
- def in_pool(rendezvous):
- request = next(rendezvous)
- method(
- request, rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
- return _adaptation(pool, in_pool)
-
-
-def adapt_event_unary_stream(method, pool):
- def in_pool(rendezvous):
- request = next(rendezvous)
- method(request, rendezvous, _ServicerContext(rendezvous))
- return _adaptation(pool, in_pool)
-
-
-def adapt_event_stream_unary(method, pool):
- def in_pool(rendezvous):
- request_consumer = method(
- rendezvous.consume_and_terminate, _ServicerContext(rendezvous))
- for request in rendezvous:
- request_consumer.consume(request)
- request_consumer.terminate()
- return _adaptation(pool, in_pool)
-
-
-def adapt_event_stream_stream(method, pool):
- def in_pool(rendezvous):
- request_consumer = method(rendezvous, _ServicerContext(rendezvous))
- for request in rendezvous:
- request_consumer.consume(request)
- request_consumer.terminate()
- return _adaptation(pool, in_pool)
-
-
-def adapt_multi_method(multi_method, pool):
- def adaptation(group, method, operator, operation_context):
- rendezvous = _control.Rendezvous(operator, operation_context)
- subscription = utilities.full_subscription(
- rendezvous, _control.protocol_receiver(rendezvous))
- outcome = operation_context.add_termination_callback(rendezvous.set_outcome)
- if outcome is None:
- def in_pool():
- request_consumer = multi_method.service(
- group, method, rendezvous, _ServicerContext(rendezvous))
- for request in rendezvous:
- request_consumer.consume(request)
- request_consumer.terminate()
- pool.submit(_control.pool_wrap(in_pool, operation_context), rendezvous)
- return subscription
- else:
- raise abandonment.Abandoned()
- return adaptation
diff --git a/src/python/grpcio/grpc/framework/crust/implementations.py b/src/python/grpcio/grpc/framework/crust/implementations.py
deleted file mode 100644
index 2d3ab733b6..0000000000
--- a/src/python/grpcio/grpc/framework/crust/implementations.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Entry points into the Crust layer of RPC Framework."""
-
-import six
-
-from grpc.framework.common import cardinality
-from grpc.framework.common import style
-from grpc.framework.crust import _calls
-from grpc.framework.crust import _service
-from grpc.framework.interfaces.base import base
-from grpc.framework.interfaces.face import face
-
-
-class _BaseServicer(base.Servicer):
-
- def __init__(self, adapted_methods, adapted_multi_method):
- self._adapted_methods = adapted_methods
- self._adapted_multi_method = adapted_multi_method
-
- def service(self, group, method, context, output_operator):
- adapted_method = self._adapted_methods.get((group, method), None)
- if adapted_method is not None:
- return adapted_method(output_operator, context)
- elif self._adapted_multi_method is not None:
- try:
- return self._adapted_multi_method(
- group, method, output_operator, context)
- except face.NoSuchMethodError:
- raise base.NoSuchMethodError(None, None)
- else:
- raise base.NoSuchMethodError(None, None)
-
-
-class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
-
- def __init__(self, end, group, method, pool):
- self._end = end
- self._group = group
- self._method = method
- self._pool = pool
-
- def __call__(
- self, request, timeout, metadata=None, with_call=False,
- protocol_options=None):
- return _calls.blocking_unary_unary(
- self._end, self._group, self._method, timeout, with_call,
- protocol_options, metadata, request)
-
- def future(self, request, timeout, metadata=None, protocol_options=None):
- return _calls.future_unary_unary(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request)
-
- def event(
- self, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_unary_unary(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request, receiver, abortion_callback, self._pool)
-
-
-class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
-
- def __init__(self, end, group, method, pool):
- self._end = end
- self._group = group
- self._method = method
- self._pool = pool
-
- def __call__(self, request, timeout, metadata=None, protocol_options=None):
- return _calls.inline_unary_stream(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request)
-
- def event(
- self, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_unary_stream(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request, receiver, abortion_callback, self._pool)
-
-
-class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
-
- def __init__(self, end, group, method, pool):
- self._end = end
- self._group = group
- self._method = method
- self._pool = pool
-
- def __call__(
- self, request_iterator, timeout, metadata=None,
- with_call=False, protocol_options=None):
- return _calls.blocking_stream_unary(
- self._end, self._group, self._method, timeout, with_call,
- protocol_options, metadata, request_iterator, self._pool)
-
- def future(
- self, request_iterator, timeout, metadata=None, protocol_options=None):
- return _calls.future_stream_unary(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request_iterator, self._pool)
-
- def event(
- self, receiver, abortion_callback, timeout, metadata=None,
- protocol_options=None):
- return _calls.event_stream_unary(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, receiver, abortion_callback, self._pool)
-
-
-class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
-
- def __init__(self, end, group, method, pool):
- self._end = end
- self._group = group
- self._method = method
- self._pool = pool
-
- def __call__(
- self, request_iterator, timeout, metadata=None, protocol_options=None):
- return _calls.inline_stream_stream(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, request_iterator, self._pool)
-
- def event(
- self, receiver, abortion_callback, timeout, metadata=None,
- protocol_options=None):
- return _calls.event_stream_stream(
- self._end, self._group, self._method, timeout, protocol_options,
- metadata, receiver, abortion_callback, self._pool)
-
-
-class _GenericStub(face.GenericStub):
- """An face.GenericStub implementation."""
-
- def __init__(self, end, pool):
- self._end = end
- self._pool = pool
-
- def blocking_unary_unary(
- self, group, method, request, timeout, metadata=None,
- with_call=None, protocol_options=None):
- return _calls.blocking_unary_unary(
- self._end, group, method, timeout, with_call, protocol_options,
- metadata, request)
-
- def future_unary_unary(
- self, group, method, request, timeout, metadata=None,
- protocol_options=None):
- return _calls.future_unary_unary(
- self._end, group, method, timeout, protocol_options, metadata, request)
-
- def inline_unary_stream(
- self, group, method, request, timeout, metadata=None,
- protocol_options=None):
- return _calls.inline_unary_stream(
- self._end, group, method, timeout, protocol_options, metadata, request)
-
- def blocking_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None,
- with_call=None, protocol_options=None):
- return _calls.blocking_stream_unary(
- self._end, group, method, timeout, with_call, protocol_options,
- metadata, request_iterator, self._pool)
-
- def future_stream_unary(
- self, group, method, request_iterator, timeout, metadata=None,
- protocol_options=None):
- return _calls.future_stream_unary(
- self._end, group, method, timeout, protocol_options, metadata,
- request_iterator, self._pool)
-
- def inline_stream_stream(
- self, group, method, request_iterator, timeout, metadata=None,
- protocol_options=None):
- return _calls.inline_stream_stream(
- self._end, group, method, timeout, protocol_options, metadata,
- request_iterator, self._pool)
-
- def event_unary_unary(
- self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_unary_unary(
- self._end, group, method, timeout, protocol_options, metadata, request,
- receiver, abortion_callback, self._pool)
-
- def event_unary_stream(
- self, group, method, request, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_unary_stream(
- self._end, group, method, timeout, protocol_options, metadata, request,
- receiver, abortion_callback, self._pool)
-
- def event_stream_unary(
- self, group, method, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_stream_unary(
- self._end, group, method, timeout, protocol_options, metadata, receiver,
- abortion_callback, self._pool)
-
- def event_stream_stream(
- self, group, method, receiver, abortion_callback, timeout,
- metadata=None, protocol_options=None):
- return _calls.event_stream_stream(
- self._end, group, method, timeout, protocol_options, metadata, receiver,
- abortion_callback, self._pool)
-
- def unary_unary(self, group, method):
- return _UnaryUnaryMultiCallable(self._end, group, method, self._pool)
-
- def unary_stream(self, group, method):
- return _UnaryStreamMultiCallable(self._end, group, method, self._pool)
-
- def stream_unary(self, group, method):
- return _StreamUnaryMultiCallable(self._end, group, method, self._pool)
-
- def stream_stream(self, group, method):
- return _StreamStreamMultiCallable(self._end, group, method, self._pool)
-
-
-class _DynamicStub(face.DynamicStub):
- """An face.DynamicStub implementation."""
-
- def __init__(self, end, group, cardinalities, pool):
- self._end = end
- self._group = group
- self._cardinalities = cardinalities
- self._pool = pool
-
- def __getattr__(self, attr):
- method_cardinality = self._cardinalities.get(attr)
- if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
- return _UnaryUnaryMultiCallable(self._end, self._group, attr, self._pool)
- elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
- return _UnaryStreamMultiCallable(self._end, self._group, attr, self._pool)
- elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
- return _StreamUnaryMultiCallable(self._end, self._group, attr, self._pool)
- elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
- return _StreamStreamMultiCallable(
- self._end, self._group, attr, self._pool)
- else:
- raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
-
-
-def _adapt_method_implementations(method_implementations, pool):
- adapted_implementations = {}
- for name, method_implementation in six.iteritems(method_implementations):
- if method_implementation.style is style.Service.INLINE:
- if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
- adapted_implementations[name] = _service.adapt_inline_unary_unary(
- method_implementation.unary_unary_inline, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
- adapted_implementations[name] = _service.adapt_inline_unary_stream(
- method_implementation.unary_stream_inline, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
- adapted_implementations[name] = _service.adapt_inline_stream_unary(
- method_implementation.stream_unary_inline, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
- adapted_implementations[name] = _service.adapt_inline_stream_stream(
- method_implementation.stream_stream_inline, pool)
- elif method_implementation.style is style.Service.EVENT:
- if method_implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
- adapted_implementations[name] = _service.adapt_event_unary_unary(
- method_implementation.unary_unary_event, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
- adapted_implementations[name] = _service.adapt_event_unary_stream(
- method_implementation.unary_stream_event, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
- adapted_implementations[name] = _service.adapt_event_stream_unary(
- method_implementation.stream_unary_event, pool)
- elif method_implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
- adapted_implementations[name] = _service.adapt_event_stream_stream(
- method_implementation.stream_stream_event, pool)
- return adapted_implementations
-
-
-def servicer(method_implementations, multi_method_implementation, pool):
- """Creates a base.Servicer.
-
- It is guaranteed that any passed face.MultiMethodImplementation will
- only be called to service an RPC if there is no
- face.MethodImplementation for the RPC method in the passed
- method_implementations dictionary.
-
- Args:
- method_implementations: A dictionary from RPC method name to
- face.MethodImplementation object to be used to service the named
- RPC method.
- multi_method_implementation: An face.MultiMethodImplementation to be
- used to service any RPCs not serviced by the
- face.MethodImplementations given in the method_implementations
- dictionary, or None.
- pool: A thread pool.
-
- Returns:
- A base.Servicer that services RPCs via the given implementations.
- """
- adapted_implementations = _adapt_method_implementations(
- method_implementations, pool)
- if multi_method_implementation is None:
- adapted_multi_method_implementation = None
- else:
- adapted_multi_method_implementation = _service.adapt_multi_method(
- multi_method_implementation, pool)
- return _BaseServicer(
- adapted_implementations, adapted_multi_method_implementation)
-
-
-def generic_stub(end, pool):
- """Creates an face.GenericStub.
-
- Args:
- end: A base.End.
- pool: A futures.ThreadPoolExecutor.
-
- Returns:
- A face.GenericStub that performs RPCs via the given base.End.
- """
- return _GenericStub(end, pool)
-
-
-def dynamic_stub(end, group, cardinalities, pool):
- """Creates an face.DynamicStub.
-
- Args:
- end: A base.End.
- group: The group identifier for all RPCs to be made with the created
- face.DynamicStub.
- cardinalities: A dict from method identifier to cardinality.Cardinality
- value identifying the cardinality of every RPC method to be supported by
- the created face.DynamicStub.
- pool: A futures.ThreadPoolExecutor.
-
- Returns:
- A face.DynamicStub that performs RPCs via the given base.End.
- """
- return _DynamicStub(end, group, cardinalities, pool)
diff --git a/src/python/grpcio/grpc/framework/foundation/_timer_future.py b/src/python/grpcio/grpc/framework/foundation/_timer_future.py
deleted file mode 100644
index 2c9996aa9d..0000000000
--- a/src/python/grpcio/grpc/framework/foundation/_timer_future.py
+++ /dev/null
@@ -1,228 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Affords a Future implementation based on Python's threading.Timer."""
-
-import sys
-import threading
-import time
-
-from grpc.framework.foundation import future
-
-
-class TimerFuture(future.Future):
- """A Future implementation based around Timer objects."""
-
- def __init__(self, compute_time, computation):
- """Constructor.
-
- Args:
- compute_time: The time after which to begin this future's computation.
- computation: The computation to be performed within this Future.
- """
- self._lock = threading.Lock()
- self._compute_time = compute_time
- self._computation = computation
- self._timer = None
- self._computing = False
- self._computed = False
- self._cancelled = False
- self._result = None
- self._exception = None
- self._traceback = None
- self._waiting = []
-
- def _compute(self):
- """Performs the computation embedded in this Future.
-
- Or doesn't, if the time to perform it has not yet arrived.
- """
- with self._lock:
- time_remaining = self._compute_time - time.time()
- if 0 < time_remaining:
- self._timer = threading.Timer(time_remaining, self._compute)
- self._timer.start()
- return
- else:
- self._computing = True
-
- try:
- return_value = self._computation()
- exception = None
- traceback = None
- except Exception as e: # pylint: disable=broad-except
- return_value = None
- exception = e
- traceback = sys.exc_info()[2]
-
- with self._lock:
- self._computing = False
- self._computed = True
- self._return_value = return_value
- self._exception = exception
- self._traceback = traceback
- waiting = self._waiting
-
- for callback in waiting:
- callback(self)
-
- def start(self):
- """Starts this Future.
-
- This must be called exactly once, immediately after construction.
- """
- with self._lock:
- self._timer = threading.Timer(
- self._compute_time - time.time(), self._compute)
- self._timer.start()
-
- def cancel(self):
- """See future.Future.cancel for specification."""
- with self._lock:
- if self._computing or self._computed:
- return False
- elif self._cancelled:
- return True
- else:
- self._timer.cancel()
- self._cancelled = True
- waiting = self._waiting
-
- for callback in waiting:
- try:
- callback(self)
- except Exception: # pylint: disable=broad-except
- pass
-
- return True
-
- def cancelled(self):
- """See future.Future.cancelled for specification."""
- with self._lock:
- return self._cancelled
-
- def running(self):
- """See future.Future.running for specification."""
- with self._lock:
- return not self._computed and not self._cancelled
-
- def done(self):
- """See future.Future.done for specification."""
- with self._lock:
- return self._computed or self._cancelled
-
- def result(self, timeout=None):
- """See future.Future.result for specification."""
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- if self._exception is None:
- return self._return_value
- else:
- raise self._exception # pylint: disable=raising-bad-type
-
- condition = threading.Condition()
- def notify_condition(unused_future):
- with condition:
- condition.notify()
- self._waiting.append(notify_condition)
-
- with condition:
- condition.wait(timeout=timeout)
-
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- if self._exception is None:
- return self._return_value
- else:
- raise self._exception # pylint: disable=raising-bad-type
- else:
- raise future.TimeoutError()
-
- def exception(self, timeout=None):
- """See future.Future.exception for specification."""
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- return self._exception
-
- condition = threading.Condition()
- def notify_condition(unused_future):
- with condition:
- condition.notify()
- self._waiting.append(notify_condition)
-
- with condition:
- condition.wait(timeout=timeout)
-
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- return self._exception
- else:
- raise future.TimeoutError()
-
- def traceback(self, timeout=None):
- """See future.Future.traceback for specification."""
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- return self._traceback
-
- condition = threading.Condition()
- def notify_condition(unused_future):
- with condition:
- condition.notify()
- self._waiting.append(notify_condition)
-
- with condition:
- condition.wait(timeout=timeout)
-
- with self._lock:
- if self._cancelled:
- raise future.CancelledError()
- elif self._computed:
- return self._traceback
- else:
- raise future.TimeoutError()
-
- def add_done_callback(self, fn):
- """See future.Future.add_done_callback for specification."""
- with self._lock:
- if not self._computed and not self._cancelled:
- self._waiting.append(fn)
- return
-
- fn(self)
diff --git a/src/python/grpcio/grpc/framework/foundation/activated.py b/src/python/grpcio/grpc/framework/foundation/activated.py
deleted file mode 100644
index 8b8e4f45b5..0000000000
--- a/src/python/grpcio/grpc/framework/foundation/activated.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Interfaces related to streams of values or objects."""
-
-import abc
-
-import six
-
-class Activated(six.with_metaclass(abc.ABCMeta)):
- """Interface for objects that may be started and stopped.
-
- Values implementing this type must also implement the context manager
- protocol.
- """
-
- @abc.abstractmethod
- def __enter__(self):
- """See the context manager protocol for specification."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def __exit__(self, exc_type, exc_val, exc_tb):
- """See the context manager protocol for specification."""
- raise NotImplementedError()
-
- @abc.abstractmethod
- def start(self):
- """Activates this object.
-
- Returns:
- A value equal to the value returned by this object's __enter__ method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stop(self):
- """Deactivates this object."""
- raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/foundation/later.py b/src/python/grpcio/grpc/framework/foundation/later.py
deleted file mode 100644
index 1d1e065041..0000000000
--- a/src/python/grpcio/grpc/framework/foundation/later.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Enables scheduling execution at a later time."""
-
-import time
-
-from grpc.framework.foundation import _timer_future
-
-
-def later(delay, computation):
- """Schedules later execution of a callable.
-
- Args:
- delay: Any numeric value. Represents the minimum length of time in seconds
- to allow to pass before beginning the computation. No guarantees are made
- about the maximum length of time that will pass.
- computation: A callable that accepts no arguments.
-
- Returns:
- A Future representing the scheduled computation.
- """
- timer_future = _timer_future.TimerFuture(time.time() + delay, computation)
- timer_future.start()
- return timer_future
diff --git a/src/python/grpcio/grpc/framework/foundation/relay.py b/src/python/grpcio/grpc/framework/foundation/relay.py
deleted file mode 100644
index 20f41b2738..0000000000
--- a/src/python/grpcio/grpc/framework/foundation/relay.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Implementations of in-order work deference."""
-
-import abc
-import enum
-import threading
-
-from grpc.framework.foundation import activated
-from grpc.framework.foundation import logging_pool
-
-_NULL_BEHAVIOR = lambda unused_value: None
-
-
-class Relay(object):
- """Performs work submitted to it in another thread.
-
- Performs work in the order in which work was submitted to it; otherwise there
- would be no reason to use an implementation of this interface instead of a
- thread pool.
- """
-
- @abc.abstractmethod
- def add_value(self, value):
- """Adds a value to be passed to the behavior registered with this Relay.
-
- Args:
- value: A value that will be passed to a call made in another thread to the
- behavior registered with this Relay.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def set_behavior(self, behavior):
- """Sets the behavior that this Relay should call when passed values.
-
- Args:
- behavior: The behavior that this Relay should call in another thread when
- passed a value, or None to have passed values ignored.
- """
- raise NotImplementedError()
-
-
-class _PoolRelay(activated.Activated, Relay):
-
- @enum.unique
- class _State(enum.Enum):
- INACTIVE = 'inactive'
- IDLE = 'idle'
- SPINNING = 'spinning'
-
- def __init__(self, pool, behavior):
- self._condition = threading.Condition()
- self._pool = pool
- self._own_pool = pool is None
- self._state = _PoolRelay._State.INACTIVE
- self._activated = False
- self._spinning = False
- self._values = []
- self._behavior = _NULL_BEHAVIOR if behavior is None else behavior
-
- def _spin(self, behavior, value):
- while True:
- behavior(value)
- with self._condition:
- if self._values:
- value = self._values.pop(0)
- behavior = self._behavior
- else:
- self._state = _PoolRelay._State.IDLE
- self._condition.notify_all()
- break
-
- def add_value(self, value):
- with self._condition:
- if self._state is _PoolRelay._State.INACTIVE:
- raise ValueError('add_value not valid on inactive Relay!')
- elif self._state is _PoolRelay._State.IDLE:
- self._pool.submit(self._spin, self._behavior, value)
- self._state = _PoolRelay._State.SPINNING
- else:
- self._values.append(value)
-
- def set_behavior(self, behavior):
- with self._condition:
- self._behavior = _NULL_BEHAVIOR if behavior is None else behavior
-
- def _start(self):
- with self._condition:
- self._state = _PoolRelay._State.IDLE
- if self._own_pool:
- self._pool = logging_pool.pool(1)
- return self
-
- def _stop(self):
- with self._condition:
- while self._state is _PoolRelay._State.SPINNING:
- self._condition.wait()
- if self._own_pool:
- self._pool.shutdown(wait=True)
- self._state = _PoolRelay._State.INACTIVE
-
- def __enter__(self):
- return self._start()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop()
- return False
-
- def start(self):
- return self._start()
-
- def stop(self):
- self._stop()
-
-
-def relay(behavior):
- """Creates a Relay.
-
- Args:
- behavior: The behavior to be called by the created Relay, or None to have
- passed values dropped until a different behavior is given to the returned
- Relay later.
-
- Returns:
- An object that is both an activated.Activated and a Relay. The object is
- only valid for use as a Relay when activated.
- """
- return _PoolRelay(None, behavior)
-
-
-def pool_relay(pool, behavior):
- """Creates a Relay that uses a given thread pool.
-
- This object will make use of at most one thread in the given pool.
-
- Args:
- pool: A futures.ThreadPoolExecutor for use by the created Relay.
- behavior: The behavior to be called by the created Relay, or None to have
- passed values dropped until a different behavior is given to the returned
- Relay later.
-
- Returns:
- An object that is both an activated.Activated and a Relay. The object is
- only valid for use as a Relay when activated.
- """
- return _PoolRelay(pool, behavior)
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/__init__.py b/src/python/grpcio/grpc/framework/interfaces/links/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/grpc/framework/interfaces/links/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py
deleted file mode 100644
index 9631b19078..0000000000
--- a/src/python/grpcio/grpc/framework/interfaces/links/links.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""The low-level ticket-exchanging-links interface of RPC Framework."""
-
-import abc
-import collections
-import enum
-
-import six
-
-
-class Protocol(collections.namedtuple('Protocol', ('kind', 'value',))):
- """A sum type for handles to a system that transmits tickets.
-
- Attributes:
- kind: A Kind value identifying the kind of value being passed.
- value: The value being passed between the high-level application and the
- system affording ticket transport.
- """
-
- @enum.unique
- class Kind(enum.Enum):
- CALL_OPTION = 'call option'
- SERVICER_CONTEXT = 'servicer context'
- INVOCATION_CONTEXT = 'invocation context'
-
-
-class Ticket(
- collections.namedtuple(
- 'Ticket',
- ('operation_id', 'sequence_number', 'group', 'method', 'subscription',
- 'timeout', 'allowance', 'initial_metadata', 'payload',
- 'terminal_metadata', 'code', 'message', 'termination', 'protocol',))):
- """A sum type for all values sent from a front to a back.
-
- Attributes:
- operation_id: A unique-with-respect-to-equality hashable object identifying
- a particular operation.
- sequence_number: A zero-indexed integer sequence number identifying the
- ticket's place in the stream of tickets sent in one direction for the
- particular operation.
- group: The group to which the method of the operation belongs. Must be
- present in the first ticket from invocation side to service side. Ignored
- for all other tickets exchanged during the operation.
- method: The name of an operation. Must be present in the first ticket from
- invocation side to service side. Ignored for all other tickets exchanged
- during the operation.
- subscription: A Subscription value describing the interest one side has in
- receiving information from the other side. Must be present in the first
- ticket from either side. Ignored for all other tickets exchanged during
- the operation.
- timeout: A nonzero length of time (measured from the beginning of the
- operation) to allow for the entire operation. Must be present in the first
- ticket from invocation side to service side. Optional for all other
- tickets exchanged during the operation. Receipt of a value from the other
- side of the operation indicates the value in use by that side. Setting a
- value on a later ticket allows either side to request time extensions (or
- even time reductions!) on in-progress operations.
- allowance: A positive integer granting permission for a number of payloads
- to be transmitted to the communicating side of the operation, or None if
- no additional allowance is being granted with this ticket.
- initial_metadata: An optional metadata value communicated from one side to
- the other at the beginning of the operation. May be non-None in at most
- one ticket from each side. Any non-None value must appear no later than
- the first payload value.
- payload: A customer payload object. May be None.
- terminal_metadata: A metadata value comminicated from one side to the other
- at the end of the operation. May be non-None in the same ticket as
- the code and message, but must be None for all earlier tickets.
- code: A value communicated at operation completion. May be None.
- message: A value communicated at operation completion. May be None.
- termination: A Termination value describing the end of the operation, or
- None if the operation has not yet terminated. If set, no further tickets
- may be sent in the same direction.
- protocol: A Protocol value or None, with further semantics being a matter
- between high-level application and underlying ticket transport.
- """
-
- @enum.unique
- class Subscription(enum.Enum):
- """Identifies the level of subscription of a side of an operation."""
-
- NONE = 'none'
- TERMINATION = 'termination'
- FULL = 'full'
-
- @enum.unique
- class Termination(enum.Enum):
- """Identifies the termination of an operation."""
-
- COMPLETION = 'completion'
- CANCELLATION = 'cancellation'
- EXPIRATION = 'expiration'
- SHUTDOWN = 'shutdown'
- RECEPTION_FAILURE = 'reception failure'
- TRANSMISSION_FAILURE = 'transmission failure'
- LOCAL_FAILURE = 'local failure'
- REMOTE_FAILURE = 'remote failure'
-
-
-class Link(six.with_metaclass(abc.ABCMeta)):
- """Accepts and emits tickets."""
-
- @abc.abstractmethod
- def accept_ticket(self, ticket):
- """Accept a Ticket.
-
- Args:
- ticket: Any Ticket.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def join_link(self, link):
- """Mates this object with a peer with which it will exchange tickets."""
- raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/framework/interfaces/links/utilities.py b/src/python/grpcio/grpc/framework/interfaces/links/utilities.py
deleted file mode 100644
index 6e4fd76d93..0000000000
--- a/src/python/grpcio/grpc/framework/interfaces/links/utilities.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Utilities provided as part of the links interface."""
-
-from grpc.framework.interfaces.links import links
-
-
-class _NullLink(links.Link):
- """A do-nothing links.Link."""
-
- def accept_ticket(self, ticket):
- pass
-
- def join_link(self, link):
- pass
-
-NULL_LINK = _NullLink()
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 0f4db9d972..d1e2d418c3 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION='0.16.0.dev0'
+VERSION='1.0.0rc1'