aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-03-10 00:52:32 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-03-10 00:52:32 +0000
commit31e65beaed4804c4c3926ae846286871077b39d2 (patch)
treecb5a0661b70bd83f26140330b29bd41b9ae9c3a1 /src/python
parentf9007045f25b5886b71f69fbbc23d5078f5f7466 (diff)
Eliminate the Python "assembly" package
This completes issue #726. All that cascading activation stuff just didn't work out as cleanly as I had hoped it would.
Diffstat (limited to 'src/python')
-rw-r--r--src/python/src/grpc/_adapter/fore.py87
-rw-r--r--src/python/src/grpc/_adapter/rear.py124
-rw-r--r--src/python/src/grpc/early_adopter/_reexport.py51
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py152
-rw-r--r--src/python/src/grpc/framework/assembly/__init__.py30
-rw-r--r--src/python/src/grpc/framework/assembly/implementations.py264
-rw-r--r--src/python/src/grpc/framework/assembly/implementations_test.py288
-rw-r--r--src/python/src/grpc/framework/assembly/interfaces.py58
-rw-r--r--src/python/src/grpc/framework/face/implementations.py12
-rw-r--r--src/python/src/setup.py1
10 files changed, 141 insertions, 926 deletions
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 6ef9e60006..339c0ef216 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -357,90 +357,3 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
-
-
-class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated):
-
- def __init__(
- self, port, request_deserializers, response_serializers,
- root_certificates, key_chain_pairs):
- self._port = port
- self._request_deserializers = request_deserializers
- self._response_serializers = response_serializers
- self._root_certificates = root_certificates
- self._key_chain_pairs = key_chain_pairs
-
- self._lock = threading.Lock()
- self._pool = None
- self._fore_link = None
- self._rear_link = null.NULL_REAR_LINK
-
- def join_rear_link(self, rear_link):
- with self._lock:
- self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
- if self._fore_link is not None:
- self._fore_link.join_rear_link(rear_link)
-
- def _start(self):
- with self._lock:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- self._fore_link = ForeLink(
- self._pool, self._request_deserializers, self._response_serializers,
- self._root_certificates, self._key_chain_pairs, port=self._port)
- self._fore_link.join_rear_link(self._rear_link)
- self._fore_link.start()
- return self
-
- def _stop(self):
- with self._lock:
- self._fore_link.stop()
- self._fore_link = None
- self._pool.shutdown(wait=True)
- self._pool = None
-
- def __enter__(self):
- return self._start()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop()
- return False
-
- def start(self):
- return self._start()
-
- def stop(self):
- self._stop()
-
- def port(self):
- with self._lock:
- return None if self._fore_link is None else self._fore_link.port()
-
- def accept_back_to_front_ticket(self, ticket):
- with self._lock:
- if self._fore_link is not None:
- self._fore_link.accept_back_to_front_ticket(ticket)
-
-
-def activated_fore_link(
- port, request_deserializers, response_serializers, root_certificates,
- key_chain_pairs):
- """Creates a ForeLink that is also an activated.Activated.
-
- The returned object is only valid for use between calls to its start and stop
- methods (or in context when used as a context manager).
-
- Args:
- port: The port on which to serve RPCs, or None for a port to be
- automatically selected.
- request_deserializers: A dictionary from RPC method names to request object
- deserializer behaviors.
- response_serializers: A dictionary from RPC method names to response object
- serializer behaviors.
- root_certificates: The PEM-encoded client root certificates as a bytestring
- or None.
- key_chain_pairs: A sequence of PEM-encoded private key-certificate chain
- pairs.
- """
- return _ActivatedForeLink(
- port, request_deserializers, response_serializers, root_certificates,
- key_chain_pairs)
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index fc71bf0a6c..62703fab30 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -387,127 +387,3 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated):
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
self._cancel(ticket.operation_id)
-
-
-class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated):
-
- def __init__(
- self, host, port, request_serializers, response_deserializers, secure,
- root_certificates, private_key, certificate_chain,
- server_host_override=None):
- self._host = host
- self._port = port
- self._request_serializers = request_serializers
- self._response_deserializers = response_deserializers
- self._secure = secure
- self._root_certificates = root_certificates
- self._private_key = private_key
- self._certificate_chain = certificate_chain
- self._server_host_override = server_host_override
-
- self._lock = threading.Lock()
- self._pool = None
- self._rear_link = None
- self._fore_link = null.NULL_FORE_LINK
-
- def join_fore_link(self, fore_link):
- with self._lock:
- self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
- if self._rear_link is not None:
- self._rear_link.join_fore_link(self._fore_link)
-
- def _start(self):
- with self._lock:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- self._rear_link = RearLink(
- self._host, self._port, self._pool, self._request_serializers,
- self._response_deserializers, self._secure, self._root_certificates,
- self._private_key, self._certificate_chain,
- server_host_override=self._server_host_override)
- self._rear_link.join_fore_link(self._fore_link)
- self._rear_link.start()
- return self
-
- def _stop(self):
- with self._lock:
- self._rear_link.stop()
- self._rear_link = None
- self._pool.shutdown(wait=True)
- self._pool = None
-
- def __enter__(self):
- return self._start()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop()
- return False
-
- def start(self):
- return self._start()
-
- def stop(self):
- self._stop()
-
- def accept_front_to_back_ticket(self, ticket):
- with self._lock:
- if self._rear_link is not None:
- self._rear_link.accept_front_to_back_ticket(ticket)
-
-
-# TODO(issue 726): reconcile these two creation functions.
-def activated_rear_link(
- host, port, request_serializers, response_deserializers):
- """Creates a RearLink that is also an activated.Activated.
-
- The returned object is only valid for use between calls to its start and stop
- methods (or in context when used as a context manager).
-
- Args:
- host: The host to which to connect for RPC service.
- port: The port to which to connect for RPC service.
- request_serializers: A dictionary from RPC method name to request object
- serializer behavior.
- response_deserializers: A dictionary from RPC method name to response
- object deserializer behavior.
- secure: A boolean indicating whether or not to use a secure connection.
- root_certificates: The PEM-encoded root certificates or None to ask for
- them to be retrieved from a default location.
- private_key: The PEM-encoded private key to use or None if no private key
- should be used.
- certificate_chain: The PEM-encoded certificate chain to use or None if no
- certificate chain should be used.
- """
- return _ActivatedRearLink(
- host, port, request_serializers, response_deserializers, False, None,
- None, None)
-
-
-
-def secure_activated_rear_link(
- host, port, request_serializers, response_deserializers, root_certificates,
- private_key, certificate_chain, server_host_override=None):
- """Creates a RearLink that is also an activated.Activated.
-
- The returned object is only valid for use between calls to its start and stop
- methods (or in context when used as a context manager).
-
- Args:
- host: The host to which to connect for RPC service.
- port: The port to which to connect for RPC service.
- request_serializers: A dictionary from RPC method name to request object
- serializer behavior.
- response_deserializers: A dictionary from RPC method name to response
- object deserializer behavior.
- root_certificates: The PEM-encoded root certificates or None to ask for
- them to be retrieved from a default location.
- private_key: The PEM-encoded private key to use or None if no private key
- should be used.
- certificate_chain: The PEM-encoded certificate chain to use or None if no
- certificate chain should be used.
- server_host_override: (For testing only) the target name used for SSL
- host name checking.
- """
- return _ActivatedRearLink(
- host, port, request_serializers, response_deserializers, True,
- root_certificates, private_key, certificate_chain,
- server_host_override=server_host_override)
diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py
index 3fed8099f6..f3416028e8 100644
--- a/src/python/src/grpc/early_adopter/_reexport.py
+++ b/src/python/src/grpc/early_adopter/_reexport.py
@@ -174,45 +174,6 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))
-class _Stub(interfaces.Stub):
-
- def __init__(self, assembly_stub, cardinalities):
- self._assembly_stub = assembly_stub
- self._cardinalities = cardinalities
-
- def __enter__(self):
- self._assembly_stub.__enter__()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._assembly_stub.__exit__(exc_type, exc_val, exc_tb)
- return False
-
- def __getattr__(self, attr):
- underlying_attr = self._assembly_stub.__getattr__(attr)
- method_cardinality = self._cardinalities.get(attr)
- # TODO(nathaniel): unify this trick with its other occurrence in the code.
- if method_cardinality is None:
- for name, method_cardinality in self._cardinalities.iteritems():
- last_slash_index = name.rfind('/')
- if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
- break
- else:
- raise AttributeError(attr)
- if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
- return _UnaryUnarySyncAsync(underlying_attr)
- elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
- return lambda request, timeout: _CancellableIterator(
- underlying_attr(request, timeout))
- elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
- return _StreamUnarySyncAsync(underlying_attr)
- elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
- return lambda request_iterator, timeout: _CancellableIterator(
- underlying_attr(request_iterator, timeout))
- else:
- raise AttributeError(attr)
-
-
def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():
@@ -225,5 +186,13 @@ def rpc_context(face_rpc_context):
return _RpcContext(face_rpc_context)
-def stub(face_stub, cardinalities):
- return _Stub(face_stub, cardinalities)
+def cancellable_iterator(face_cancellable_iterator):
+ return _CancellableIterator(face_cancellable_iterator)
+
+
+def unary_unary_sync_async(face_unary_unary_multi_callable):
+ return _UnaryUnarySyncAsync(face_unary_unary_multi_callable)
+
+
+def stream_unary_sync_async(face_stream_unary_multi_callable):
+ return _StreamUnarySyncAsync(face_stream_unary_multi_callable)
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index b46f94e9fb..1c02f9e4d6 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -36,7 +36,13 @@ from grpc._adapter import rear as _rear
from grpc.early_adopter import _face_utilities
from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
-from grpc.framework.assembly import implementations as _assembly_implementations
+from grpc.framework.base import util as _base_utilities
+from grpc.framework.base.packets import implementations as _tickets_implementations
+from grpc.framework.face import implementations as _face_implementations
+from grpc.framework.foundation import logging_pool
+
+_THREAD_POOL_SIZE = 80
+_ONE_DAY_IN_SECONDS = 24 * 60 * 60
class _Server(interfaces.Server):
@@ -50,30 +56,39 @@ class _Server(interfaces.Server):
else:
self._key_chain_pairs = ((private_key, certificate_chain),)
+ self._pool = None
+ self._back = None
self._fore_link = None
- self._server = None
def _start(self):
with self._lock:
- if self._server is None:
- self._fore_link = _fore.activated_fore_link(
- self._port, self._breakdown.request_deserializers,
+ if self._pool is None:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ servicer = _face_implementations.servicer(
+ self._pool, self._breakdown.implementations, None)
+ self._back = _tickets_implementations.back(
+ servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
+ _ONE_DAY_IN_SECONDS)
+ self._fore_link = _fore.ForeLink(
+ self._pool, self._breakdown.request_deserializers,
self._breakdown.response_serializers, None, self._key_chain_pairs)
-
- self._server = _assembly_implementations.assemble_service(
- self._breakdown.implementations, self._fore_link)
- self._server.start()
+ self._back.join_fore_link(self._fore_link)
+ self._fore_link.join_rear_link(self._back)
+ self._fore_link.start()
else:
raise ValueError('Server currently running!')
def _stop(self):
with self._lock:
- if self._server is None:
+ if self._pool is None:
raise ValueError('Server not running!')
else:
- self._server.stop()
- self._server = None
+ self._fore_link.stop()
+ _base_utilities.wait_for_idle(self._back)
+ self._pool.shutdown(wait=True)
self._fore_link = None
+ self._back = None
+ self._pool = None
def __enter__(self):
self._start()
@@ -93,11 +108,101 @@ class _Server(interfaces.Server):
with self._lock:
return self._fore_link.port()
-def _build_stub(breakdown, activated_rear_link):
- assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub(
- _reexport.common_cardinalities(breakdown.cardinalities),
- activated_rear_link)
- return _reexport.stub(assembly_stub, breakdown.cardinalities)
+
+class _Stub(interfaces.Stub):
+
+ def __init__(
+ self, breakdown, host, port, secure, root_certificates, private_key,
+ certificate_chain, server_host_override=None):
+ self._lock = threading.Lock()
+ self._breakdown = breakdown
+ self._host = host
+ self._port = port
+ self._secure = secure
+ self._root_certificates = root_certificates
+ self._private_key = private_key
+ self._certificate_chain = certificate_chain
+ self._server_host_override = server_host_override
+
+ self._pool = None
+ self._front = None
+ self._rear_link = None
+ self._understub = None
+
+ def __enter__(self):
+ with self._lock:
+ if self._pool is None:
+ self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
+ self._front = _tickets_implementations.front(
+ self._pool, self._pool, self._pool)
+ self._rear_link = _rear.RearLink(
+ self._host, self._port, self._pool,
+ self._breakdown.request_serializers,
+ self._breakdown.response_deserializers, self._secure,
+ self._root_certificates, self._private_key, self._certificate_chain,
+ server_host_override=self._server_host_override)
+ self._front.join_rear_link(self._rear_link)
+ self._rear_link.join_fore_link(self._front)
+ self._rear_link.start()
+ self._understub = _face_implementations.dynamic_stub(
+ _reexport.common_cardinalities(self._breakdown.cardinalities),
+ self._front, self._pool, '')
+ else:
+ raise ValueError('Tried to __enter__ already-__enter__ed Stub!')
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ with self._lock:
+ if self._pool is None:
+ raise ValueError('Tried to __exit__ non-__enter__ed Stub!')
+ else:
+ self._rear_link.stop()
+ _base_utilities.wait_for_idle(self._front)
+ self._pool.shutdown(wait=True)
+ self._rear_link = None
+ self._front = None
+ self._pool = None
+ self._understub = None
+ return False
+
+ def __getattr__(self, attr):
+ with self._lock:
+ if self._pool is None:
+ raise ValueError('Tried to __getattr__ non-__enter__ed Stub!')
+ else:
+ underlying_attr = getattr(self._understub, attr, None)
+ method_cardinality = self._breakdown.cardinalities.get(attr)
+ # TODO(nathaniel): Eliminate this trick.
+ if underlying_attr is None:
+ for method_name, method_cardinality in self._breakdown.cardinalities.iteritems():
+ last_slash_index = method_name.rfind('/')
+ if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr:
+ underlying_attr = getattr(self._understub, method_name)
+ break
+ else:
+ raise AttributeError(attr)
+ if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
+ return _reexport.unary_unary_sync_async(underlying_attr)
+ elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
+ return lambda request, timeout: _reexport.cancellable_iterator(
+ underlying_attr(request, timeout))
+ elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
+ return _reexport.stream_unary_sync_async(underlying_attr)
+ elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
+ return lambda request_iterator, timeout: (
+ _reexport.cancellable_iterator(underlying_attr(
+ request_iterator, timeout)))
+ else:
+ raise AttributeError(attr)
+
+
+def _build_stub(
+ methods, host, port, secure, root_certificates, private_key,
+ certificate_chain, server_host_override=None):
+ breakdown = _face_utilities.break_down_invocation(methods)
+ return _Stub(
+ breakdown, host, port, secure, root_certificates, private_key,
+ certificate_chain, server_host_override=server_host_override)
def _build_server(methods, port, private_key, certificate_chain):
@@ -118,11 +223,7 @@ def insecure_stub(methods, host, port):
Returns:
An interfaces.Stub affording RPC invocation.
"""
- breakdown = _face_utilities.break_down_invocation(methods)
- activated_rear_link = _rear.activated_rear_link(
- host, port, breakdown.request_serializers,
- breakdown.response_deserializers)
- return _build_stub(breakdown, activated_rear_link)
+ return _build_stub(methods, host, port, False, None, None, None)
def secure_stub(
@@ -148,12 +249,9 @@ def secure_stub(
Returns:
An interfaces.Stub affording RPC invocation.
"""
- breakdown = _face_utilities.break_down_invocation(methods)
- activated_rear_link = _rear.secure_activated_rear_link(
- host, port, breakdown.request_serializers,
- breakdown.response_deserializers, root_certificates, private_key,
+ return _build_stub(
+ methods, host, port, True, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
- return _build_stub(breakdown, activated_rear_link)
def insecure_server(methods, port):
diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/src/grpc/framework/assembly/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py
deleted file mode 100644
index 24afcbeb6d..0000000000
--- a/src/python/src/grpc/framework/assembly/implementations.py
+++ /dev/null
@@ -1,264 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Implementations for assembling RPC framework values."""
-
-import threading
-
-# tickets_interfaces, face_interfaces, and activated are referenced from
-# specification in this module.
-from grpc.framework.assembly import interfaces
-from grpc.framework.base import util as base_utilities
-from grpc.framework.base.packets import implementations as tickets_implementations
-from grpc.framework.base.packets import interfaces as tickets_interfaces # pylint: disable=unused-import
-from grpc.framework.common import cardinality
-from grpc.framework.common import style
-from grpc.framework.face import implementations as face_implementations
-from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
-from grpc.framework.face import utilities as face_utilities
-from grpc.framework.foundation import activated # pylint: disable=unused-import
-from grpc.framework.foundation import logging_pool
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-_THREAD_POOL_SIZE = 100
-
-
-class _FaceStub(object):
-
- def __init__(self, rear_link):
- self._rear_link = rear_link
- self._lock = threading.Lock()
- self._pool = None
- self._front = None
- self._under_stub = None
-
- def __enter__(self):
- with self._lock:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- self._front = tickets_implementations.front(
- self._pool, self._pool, self._pool)
- self._rear_link.start()
- self._rear_link.join_fore_link(self._front)
- self._front.join_rear_link(self._rear_link)
- self._under_stub = face_implementations.generic_stub(self._front, self._pool)
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- with self._lock:
- self._under_stub = None
- self._rear_link.stop()
- base_utilities.wait_for_idle(self._front)
- self._front = None
- self._pool.shutdown(wait=True)
- self._pool = None
- return False
-
- def __getattr__(self, attr):
- with self._lock:
- if self._under_stub is None:
- raise ValueError('Called out of context!')
- else:
- return getattr(self._under_stub, attr)
-
-
-def _behaviors(method_cardinalities, front, pool):
- behaviors = {}
- stub = face_implementations.generic_stub(front, pool)
- for name, method_cardinality in method_cardinalities.iteritems():
- if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
- behaviors[name] = stub.unary_unary_multi_callable(name)
- elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
- behaviors[name] = lambda request, context, bound_name=name: (
- stub.inline_value_in_stream_out(bound_name, request, context))
- elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
- behaviors[name] = stub.stream_unary_multi_callable(name)
- elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
- behaviors[name] = lambda request_iterator, context, bound_name=name: (
- stub.inline_stream_in_stream_out(
- bound_name, request_iterator, context))
- return behaviors
-
-
-class _DynamicInlineStub(object):
-
- def __init__(self, cardinalities, rear_link):
- self._cardinalities = cardinalities
- self._rear_link = rear_link
- self._lock = threading.Lock()
- self._pool = None
- self._front = None
- self._behaviors = None
-
- def __enter__(self):
- with self._lock:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- self._front = tickets_implementations.front(
- self._pool, self._pool, self._pool)
- self._rear_link.start()
- self._rear_link.join_fore_link(self._front)
- self._front.join_rear_link(self._rear_link)
- self._behaviors = _behaviors(
- self._cardinalities, self._front, self._pool)
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- with self._lock:
- self._behaviors = None
- self._rear_link.stop()
- base_utilities.wait_for_idle(self._front)
- self._front = None
- self._pool.shutdown(wait=True)
- self._pool = None
- return False
-
- def __getattr__(self, attr):
- with self._lock:
- behavior = self._behaviors.get(attr)
- if behavior is None:
- for name, behavior in self._behaviors.iteritems():
- last_slash_index = name.rfind('/')
- if 0 <= last_slash_index and name[last_slash_index + 1:] == attr:
- return behavior
- else:
- raise AttributeError(
- '_DynamicInlineStub instance has no attribute "%s"!' % attr)
- else:
- return behavior
-
-
-class _ServiceAssembly(interfaces.Server):
-
- def __init__(self, implementations, fore_link):
- self._implementations = implementations
- self._fore_link = fore_link
- self._lock = threading.Lock()
- self._pool = None
- self._back = None
-
- def _start(self):
- with self._lock:
- self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- servicer = face_implementations.servicer(
- self._pool, self._implementations, None)
- self._back = tickets_implementations.back(
- servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
- _ONE_DAY_IN_SECONDS)
- self._fore_link.start()
- self._fore_link.join_rear_link(self._back)
- self._back.join_fore_link(self._fore_link)
-
- def _stop(self):
- with self._lock:
- self._fore_link.stop()
- base_utilities.wait_for_idle(self._back)
- self._back = None
- self._pool.shutdown(wait=True)
- self._pool = None
-
- def __enter__(self):
- self._start()
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._stop()
- return False
-
- def start(self):
- return self._start()
-
- def stop(self):
- self._stop()
-
- def port(self):
- with self._lock:
- return self._fore_link.port()
-
-
-def assemble_face_stub(activated_rear_link):
- """Assembles a face_interfaces.GenericStub.
-
- The returned object is a context manager and may only be used in context to
- invoke RPCs.
-
- Args:
- activated_rear_link: An object that is both a tickets_interfaces.RearLink
- and an activated.Activated. The object should be in the inactive state
- when passed to this method.
-
- Returns:
- A face_interfaces.GenericStub on which, in context, RPCs can be invoked.
- """
- return _FaceStub(activated_rear_link)
-
-
-def assemble_dynamic_inline_stub(cardinalities, activated_rear_link):
- """Assembles a stub with method names for attributes.
-
- The returned object is a context manager and may only be used in context to
- invoke RPCs.
-
- The returned object, when used in context, will respond to attribute access
- as follows: if the requested attribute is the name of a unary-unary RPC
- method, the value of the attribute will be a
- face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method.
- If the requested attribute is the name of a unary-stream RPC method, the
- value of the attribute will be a face_interfaces.UnaryStreamMultiCallable
- with which to invoke the RPC method. If the requested attribute is the name
- of a stream-unary RPC method, the value of the attribute will be a
- face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method.
- If the requested attribute is the name of a stream-stream RPC method, the
- value of the attribute will be a face_interfaces.StreamStreamMultiCallable
- with which to invoke the RPC method.
-
- Args:
- cardinalities: A dictionary from RPC method name to cardinality.Cardinality
- value identifying the cardinality of the named RPC method.
- activated_rear_link: An object that is both a tickets_interfaces.RearLink
- and an activated.Activated. The object should be in the inactive state
- when passed to this method.
-
- Returns:
- A face_interfaces.DynamicStub on which, in context, RPCs can be invoked.
- """
- return _DynamicInlineStub(cardinalities, activated_rear_link)
-
-
-def assemble_service(implementations, activated_fore_link):
- """Assembles the service-side of the RPC Framework stack.
-
- Args:
- implementations: A dictionary from RPC method name to
- face_interfaces.MethodImplementation.
- activated_fore_link: An object that is both a tickets_interfaces.ForeLink
- and an activated.Activated. The object should be in the inactive state
- when passed to this method.
-
- Returns:
- An interfaces.Server encapsulating RPC service.
- """
- return _ServiceAssembly(implementations, activated_fore_link)
diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py
deleted file mode 100644
index 5540edff7a..0000000000
--- a/src/python/src/grpc/framework/assembly/implementations_test.py
+++ /dev/null
@@ -1,288 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# TODO(nathaniel): Expand this test coverage.
-
-"""Test of the GRPC-backed ForeLink and RearLink."""
-
-import threading
-import unittest
-
-from grpc.framework.assembly import implementations
-from grpc.framework.base import interfaces
-from grpc.framework.base.packets import packets as tickets
-from grpc.framework.base.packets import interfaces as tickets_interfaces
-from grpc.framework.base.packets import null
-from grpc.framework.face import utilities as face_utilities
-from grpc.framework.foundation import logging_pool
-from grpc._junkdrawer import math_pb2
-
-DIV = 'Div'
-DIV_MANY = 'DivMany'
-FIB = 'Fib'
-SUM = 'Sum'
-
-def _fibbonacci(limit):
- left, right = 0, 1
- for _ in xrange(limit):
- yield left
- left, right = right, left + right
-
-
-def _div(request, unused_context):
- return math_pb2.DivReply(
- quotient=request.dividend / request.divisor,
- remainder=request.dividend % request.divisor)
-
-
-def _div_many(request_iterator, unused_context):
- for request in request_iterator:
- yield math_pb2.DivReply(
- quotient=request.dividend / request.divisor,
- remainder=request.dividend % request.divisor)
-
-
-def _fib(request, unused_context):
- for number in _fibbonacci(request.limit):
- yield math_pb2.Num(num=number)
-
-
-def _sum(request_iterator, unused_context):
- accumulation = 0
- for request in request_iterator:
- accumulation += request.num
- return math_pb2.Num(num=accumulation)
-
-
-_IMPLEMENTATIONS = {
- DIV: face_utilities.unary_unary_inline(_div),
- DIV_MANY: face_utilities.stream_stream_inline(_div_many),
- FIB: face_utilities.unary_stream_inline(_fib),
- SUM: face_utilities.stream_unary_inline(_sum),
-}
-
-_CARDINALITIES = {
- name: implementation.cardinality
- for name, implementation in _IMPLEMENTATIONS.iteritems()}
-
-_TIMEOUT = 10
-
-
-class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink):
-
- def __init__(self):
- self._fore_lock = threading.Lock()
- self._fore_link = null.NULL_FORE_LINK
- self._rear_lock = threading.Lock()
- self._rear_link = null.NULL_REAR_LINK
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- return False
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def accept_back_to_front_ticket(self, ticket):
- with self._fore_lock:
- self._fore_link.accept_back_to_front_ticket(ticket)
-
- def join_rear_link(self, rear_link):
- with self._rear_lock:
- self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
-
- def accept_front_to_back_ticket(self, ticket):
- with self._rear_lock:
- self._rear_link.accept_front_to_back_ticket(ticket)
-
- def join_fore_link(self, fore_link):
- with self._fore_lock:
- self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
-
-
-class FaceStubTest(unittest.TestCase):
-
- def testUnaryUnary(self):
- divisor = 7
- dividend = 13
- expected_quotient = 1
- expected_remainder = 6
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- face_stub = implementations.assemble_face_stub(pipe)
-
- service.start()
- try:
- with face_stub:
- response = face_stub.blocking_value_in_value_out(
- DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend),
- _TIMEOUT)
- self.assertEqual(expected_quotient, response.quotient)
- self.assertEqual(expected_remainder, response.remainder)
- finally:
- service.stop()
-
- def testUnaryStream(self):
- stream_length = 29
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- face_stub = implementations.assemble_face_stub(pipe)
-
- with service, face_stub:
- responses = list(
- face_stub.inline_value_in_stream_out(
- FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT))
- numbers = [response.num for response in responses]
- for early, middle, later in zip(numbers, numbers[1:], numbers[2:]):
- self.assertEqual(early + middle, later)
-
- def testStreamUnary(self):
- stream_length = 13
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- face_stub = implementations.assemble_face_stub(pipe)
-
- with service, face_stub:
- multi_callable = face_stub.stream_unary_multi_callable(SUM)
- response_future = multi_callable.future(
- (math_pb2.Num(num=index) for index in range(stream_length)),
- _TIMEOUT)
- self.assertEqual(
- (stream_length * (stream_length - 1)) / 2,
- response_future.result().num)
-
- def testStreamStream(self):
- stream_length = 17
- divisor_offset = 7
- dividend_offset = 17
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- face_stub = implementations.assemble_face_stub(pipe)
-
- with service, face_stub:
- response_iterator = face_stub.inline_stream_in_stream_out(
- DIV_MANY,
- (math_pb2.DivArgs(
- divisor=divisor_offset + index,
- dividend=dividend_offset + index)
- for index in range(stream_length)),
- _TIMEOUT)
- for index, response in enumerate(response_iterator):
- self.assertEqual(
- (dividend_offset + index) / (divisor_offset + index),
- response.quotient)
- self.assertEqual(
- (dividend_offset + index) % (divisor_offset + index),
- response.remainder)
- self.assertEqual(stream_length, index + 1)
-
-
-class DynamicInlineStubTest(unittest.TestCase):
-
- def testUnaryUnary(self):
- divisor = 59
- dividend = 973
- expected_quotient = dividend / divisor
- expected_remainder = dividend % divisor
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- dynamic_stub = implementations.assemble_dynamic_inline_stub(
- _CARDINALITIES, pipe)
-
- service.start()
- with dynamic_stub:
- response = dynamic_stub.Div(
- math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT)
- self.assertEqual(expected_quotient, response.quotient)
- self.assertEqual(expected_remainder, response.remainder)
- service.stop()
-
- def testUnaryStream(self):
- stream_length = 43
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- dynamic_stub = implementations.assemble_dynamic_inline_stub(
- _CARDINALITIES, pipe)
-
- with service, dynamic_stub:
- response_iterator = dynamic_stub.Fib(
- math_pb2.FibArgs(limit=stream_length), _TIMEOUT)
- numbers = tuple(response.num for response in response_iterator)
- for early, middle, later in zip(numbers, numbers[:1], numbers[:2]):
- self.assertEqual(early + middle, later)
- self.assertEqual(stream_length, len(numbers))
-
- def testStreamUnary(self):
- stream_length = 127
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- dynamic_stub = implementations.assemble_dynamic_inline_stub(
- _CARDINALITIES, pipe)
-
- with service, dynamic_stub:
- response_future = dynamic_stub.Sum.future(
- (math_pb2.Num(num=index) for index in range(stream_length)),
- _TIMEOUT)
- self.assertEqual(
- (stream_length * (stream_length - 1)) / 2,
- response_future.result().num)
-
- def testStreamStream(self):
- stream_length = 179
- divisor_offset = 71
- dividend_offset = 1763
- pipe = PipeLink()
- service = implementations.assemble_service(_IMPLEMENTATIONS, pipe)
- dynamic_stub = implementations.assemble_dynamic_inline_stub(
- _CARDINALITIES, pipe)
-
- with service, dynamic_stub:
- response_iterator = dynamic_stub.DivMany(
- (math_pb2.DivArgs(
- divisor=divisor_offset + index,
- dividend=dividend_offset + index)
- for index in range(stream_length)),
- _TIMEOUT)
- for index, response in enumerate(response_iterator):
- self.assertEqual(
- (dividend_offset + index) / (divisor_offset + index),
- response.quotient)
- self.assertEqual(
- (dividend_offset + index) % (divisor_offset + index),
- response.remainder)
- self.assertEqual(stream_length, index + 1)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py
deleted file mode 100644
index c469dc4fd2..0000000000
--- a/src/python/src/grpc/framework/assembly/interfaces.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in
-# the face layer. The two should be squashed together as soon as manageable.
-"""Interfaces for assembling RPC Framework values."""
-
-import abc
-
-from grpc.framework.foundation import activated
-
-
-class Server(activated.Activated):
- """The server interface.
-
- Aside from being able to be activated and deactivated, objects of this type
- are able to report the port on which they are servicing RPCs.
- """
- __metaclass__ = abc.ABCMeta
-
- # TODO(issue 726): This is an abstraction violation; not every Server is
- # necessarily serving over a network at all.
- @abc.abstractmethod
- def port(self):
- """Identifies the port on which this Server is servicing RPCs.
-
- This method may only be called while the server is active.
-
- Returns:
- The number of the port on which this Server is servicing RPCs.
- """
- raise NotImplementedError()
diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py
index e8d91a3c91..4a6de52974 100644
--- a/src/python/src/grpc/framework/face/implementations.py
+++ b/src/python/src/grpc/framework/face/implementations.py
@@ -213,14 +213,14 @@ class _DynamicStub(interfaces.DynamicStub):
self._pool = pool
def __getattr__(self, attr):
- cardinality = self._cardinalities.get(attr)
- if cardinality is cardinality.Cardinality.UNARY_UNARY:
+ method_cardinality = self._cardinalities.get(attr)
+ if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
return _UnaryUnaryMultiCallable(self._front, attr)
- elif cardinality is cardinality.Cardinality.UNARY_STREAM:
+ elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
return _UnaryStreamMultiCallable(self._front, attr)
- elif cardinality is cardinality.Cardinality.STREAM_UNARY:
+ elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
return _StreamUnaryMultiCallable(self._front, attr, self._pool)
- elif cardinality is cardinality.Cardinality.STREAM_STREAM:
+ elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
return _StreamStreamMultiCallable(self._front, attr, self._pool)
else:
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
@@ -315,4 +315,4 @@ def dynamic_stub(cardinalities, front, pool, prefix):
An interfaces.DynamicStub that performs RPCs via the given
base_interfaces.Front.
"""
- return _DynamicStub(cardinalities, front, pool, prefix)
+ return _DynamicStub(cardinalities, front, pool)
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index cdb82a9dc3..a513a2811b 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -64,7 +64,6 @@ _PACKAGES = (
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
- 'grpc.framework.assembly',
'grpc.framework.base',
'grpc.framework.base.packets',
'grpc.framework.common',