diff options
author | Nathaniel Manista <nathaniel@google.com> | 2015-03-10 00:52:32 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2015-03-10 00:52:32 +0000 |
commit | 31e65beaed4804c4c3926ae846286871077b39d2 (patch) | |
tree | cb5a0661b70bd83f26140330b29bd41b9ae9c3a1 /src/python | |
parent | f9007045f25b5886b71f69fbbc23d5078f5f7466 (diff) |
Eliminate the Python "assembly" package
This completes issue #726. All that cascading activation stuff just
didn't work out as cleanly as I had hoped it would.
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/src/grpc/_adapter/fore.py | 87 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/rear.py | 124 | ||||
-rw-r--r-- | src/python/src/grpc/early_adopter/_reexport.py | 51 | ||||
-rw-r--r-- | src/python/src/grpc/early_adopter/implementations.py | 152 | ||||
-rw-r--r-- | src/python/src/grpc/framework/assembly/__init__.py | 30 | ||||
-rw-r--r-- | src/python/src/grpc/framework/assembly/implementations.py | 264 | ||||
-rw-r--r-- | src/python/src/grpc/framework/assembly/implementations_test.py | 288 | ||||
-rw-r--r-- | src/python/src/grpc/framework/assembly/interfaces.py | 58 | ||||
-rw-r--r-- | src/python/src/grpc/framework/face/implementations.py | 12 | ||||
-rw-r--r-- | src/python/src/setup.py | 1 |
10 files changed, 141 insertions, 926 deletions
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 6ef9e60006..339c0ef216 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -357,90 +357,3 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): self._complete(ticket.operation_id, ticket.payload) else: self._cancel(ticket.operation_id) - - -class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated): - - def __init__( - self, port, request_deserializers, response_serializers, - root_certificates, key_chain_pairs): - self._port = port - self._request_deserializers = request_deserializers - self._response_serializers = response_serializers - self._root_certificates = root_certificates - self._key_chain_pairs = key_chain_pairs - - self._lock = threading.Lock() - self._pool = None - self._fore_link = None - self._rear_link = null.NULL_REAR_LINK - - def join_rear_link(self, rear_link): - with self._lock: - self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - if self._fore_link is not None: - self._fore_link.join_rear_link(rear_link) - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._fore_link = ForeLink( - self._pool, self._request_deserializers, self._response_serializers, - self._root_certificates, self._key_chain_pairs, port=self._port) - self._fore_link.join_rear_link(self._rear_link) - self._fore_link.start() - return self - - def _stop(self): - with self._lock: - self._fore_link.stop() - self._fore_link = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - return self._start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def port(self): - with self._lock: - return None if self._fore_link is None else self._fore_link.port() - - def accept_back_to_front_ticket(self, ticket): - with self._lock: - if self._fore_link is not None: - self._fore_link.accept_back_to_front_ticket(ticket) - - -def activated_fore_link( - port, request_deserializers, response_serializers, root_certificates, - key_chain_pairs): - """Creates a ForeLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - port: The port on which to serve RPCs, or None for a port to be - automatically selected. - request_deserializers: A dictionary from RPC method names to request object - deserializer behaviors. - response_serializers: A dictionary from RPC method names to response object - serializer behaviors. - root_certificates: The PEM-encoded client root certificates as a bytestring - or None. - key_chain_pairs: A sequence of PEM-encoded private key-certificate chain - pairs. - """ - return _ActivatedForeLink( - port, request_deserializers, response_serializers, root_certificates, - key_chain_pairs) diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index fc71bf0a6c..62703fab30 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -387,127 +387,3 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): else: # NOTE(nathaniel): All other categories are treated as cancellation. self._cancel(ticket.operation_id) - - -class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): - - def __init__( - self, host, port, request_serializers, response_deserializers, secure, - root_certificates, private_key, certificate_chain, - server_host_override=None): - self._host = host - self._port = port - self._request_serializers = request_serializers - self._response_deserializers = response_deserializers - self._secure = secure - self._root_certificates = root_certificates - self._private_key = private_key - self._certificate_chain = certificate_chain - self._server_host_override = server_host_override - - self._lock = threading.Lock() - self._pool = None - self._rear_link = None - self._fore_link = null.NULL_FORE_LINK - - def join_fore_link(self, fore_link): - with self._lock: - self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link - if self._rear_link is not None: - self._rear_link.join_fore_link(self._fore_link) - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._rear_link = RearLink( - self._host, self._port, self._pool, self._request_serializers, - self._response_deserializers, self._secure, self._root_certificates, - self._private_key, self._certificate_chain, - server_host_override=self._server_host_override) - self._rear_link.join_fore_link(self._fore_link) - self._rear_link.start() - return self - - def _stop(self): - with self._lock: - self._rear_link.stop() - self._rear_link = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - return self._start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def accept_front_to_back_ticket(self, ticket): - with self._lock: - if self._rear_link is not None: - self._rear_link.accept_front_to_back_ticket(ticket) - - -# TODO(issue 726): reconcile these two creation functions. -def activated_rear_link( - host, port, request_serializers, response_deserializers): - """Creates a RearLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - host: The host to which to connect for RPC service. - port: The port to which to connect for RPC service. - request_serializers: A dictionary from RPC method name to request object - serializer behavior. - response_deserializers: A dictionary from RPC method name to response - object deserializer behavior. - secure: A boolean indicating whether or not to use a secure connection. - root_certificates: The PEM-encoded root certificates or None to ask for - them to be retrieved from a default location. - private_key: The PEM-encoded private key to use or None if no private key - should be used. - certificate_chain: The PEM-encoded certificate chain to use or None if no - certificate chain should be used. - """ - return _ActivatedRearLink( - host, port, request_serializers, response_deserializers, False, None, - None, None) - - - -def secure_activated_rear_link( - host, port, request_serializers, response_deserializers, root_certificates, - private_key, certificate_chain, server_host_override=None): - """Creates a RearLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - host: The host to which to connect for RPC service. - port: The port to which to connect for RPC service. - request_serializers: A dictionary from RPC method name to request object - serializer behavior. - response_deserializers: A dictionary from RPC method name to response - object deserializer behavior. - root_certificates: The PEM-encoded root certificates or None to ask for - them to be retrieved from a default location. - private_key: The PEM-encoded private key to use or None if no private key - should be used. - certificate_chain: The PEM-encoded certificate chain to use or None if no - certificate chain should be used. - server_host_override: (For testing only) the target name used for SSL - host name checking. - """ - return _ActivatedRearLink( - host, port, request_serializers, response_deserializers, True, - root_certificates, private_key, certificate_chain, - server_host_override=server_host_override) diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py index 3fed8099f6..f3416028e8 100644 --- a/src/python/src/grpc/early_adopter/_reexport.py +++ b/src/python/src/grpc/early_adopter/_reexport.py @@ -174,45 +174,6 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): return _ReexportedFuture(self._underlying.future(request_iterator, timeout)) -class _Stub(interfaces.Stub): - - def __init__(self, assembly_stub, cardinalities): - self._assembly_stub = assembly_stub - self._cardinalities = cardinalities - - def __enter__(self): - self._assembly_stub.__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._assembly_stub.__exit__(exc_type, exc_val, exc_tb) - return False - - def __getattr__(self, attr): - underlying_attr = self._assembly_stub.__getattr__(attr) - method_cardinality = self._cardinalities.get(attr) - # TODO(nathaniel): unify this trick with its other occurrence in the code. - if method_cardinality is None: - for name, method_cardinality in self._cardinalities.iteritems(): - last_slash_index = name.rfind('/') - if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: - break - else: - raise AttributeError(attr) - if method_cardinality is interfaces.Cardinality.UNARY_UNARY: - return _UnaryUnarySyncAsync(underlying_attr) - elif method_cardinality is interfaces.Cardinality.UNARY_STREAM: - return lambda request, timeout: _CancellableIterator( - underlying_attr(request, timeout)) - elif method_cardinality is interfaces.Cardinality.STREAM_UNARY: - return _StreamUnarySyncAsync(underlying_attr) - elif method_cardinality is interfaces.Cardinality.STREAM_STREAM: - return lambda request_iterator, timeout: _CancellableIterator( - underlying_attr(request_iterator, timeout)) - else: - raise AttributeError(attr) - - def common_cardinalities(early_adopter_cardinalities): common_cardinalities = {} for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems(): @@ -225,5 +186,13 @@ def rpc_context(face_rpc_context): return _RpcContext(face_rpc_context) -def stub(face_stub, cardinalities): - return _Stub(face_stub, cardinalities) +def cancellable_iterator(face_cancellable_iterator): + return _CancellableIterator(face_cancellable_iterator) + + +def unary_unary_sync_async(face_unary_unary_multi_callable): + return _UnaryUnarySyncAsync(face_unary_unary_multi_callable) + + +def stream_unary_sync_async(face_stream_unary_multi_callable): + return _StreamUnarySyncAsync(face_stream_unary_multi_callable) diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index b46f94e9fb..1c02f9e4d6 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -36,7 +36,13 @@ from grpc._adapter import rear as _rear from grpc.early_adopter import _face_utilities from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces -from grpc.framework.assembly import implementations as _assembly_implementations +from grpc.framework.base import util as _base_utilities +from grpc.framework.base.packets import implementations as _tickets_implementations +from grpc.framework.face import implementations as _face_implementations +from grpc.framework.foundation import logging_pool + +_THREAD_POOL_SIZE = 80 +_ONE_DAY_IN_SECONDS = 24 * 60 * 60 class _Server(interfaces.Server): @@ -50,30 +56,39 @@ class _Server(interfaces.Server): else: self._key_chain_pairs = ((private_key, certificate_chain),) + self._pool = None + self._back = None self._fore_link = None - self._server = None def _start(self): with self._lock: - if self._server is None: - self._fore_link = _fore.activated_fore_link( - self._port, self._breakdown.request_deserializers, + if self._pool is None: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + servicer = _face_implementations.servicer( + self._pool, self._breakdown.implementations, None) + self._back = _tickets_implementations.back( + servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, + _ONE_DAY_IN_SECONDS) + self._fore_link = _fore.ForeLink( + self._pool, self._breakdown.request_deserializers, self._breakdown.response_serializers, None, self._key_chain_pairs) - - self._server = _assembly_implementations.assemble_service( - self._breakdown.implementations, self._fore_link) - self._server.start() + self._back.join_fore_link(self._fore_link) + self._fore_link.join_rear_link(self._back) + self._fore_link.start() else: raise ValueError('Server currently running!') def _stop(self): with self._lock: - if self._server is None: + if self._pool is None: raise ValueError('Server not running!') else: - self._server.stop() - self._server = None + self._fore_link.stop() + _base_utilities.wait_for_idle(self._back) + self._pool.shutdown(wait=True) self._fore_link = None + self._back = None + self._pool = None def __enter__(self): self._start() @@ -93,11 +108,101 @@ class _Server(interfaces.Server): with self._lock: return self._fore_link.port() -def _build_stub(breakdown, activated_rear_link): - assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub( - _reexport.common_cardinalities(breakdown.cardinalities), - activated_rear_link) - return _reexport.stub(assembly_stub, breakdown.cardinalities) + +class _Stub(interfaces.Stub): + + def __init__( + self, breakdown, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=None): + self._lock = threading.Lock() + self._breakdown = breakdown + self._host = host + self._port = port + self._secure = secure + self._root_certificates = root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain + self._server_host_override = server_host_override + + self._pool = None + self._front = None + self._rear_link = None + self._understub = None + + def __enter__(self): + with self._lock: + if self._pool is None: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._front = _tickets_implementations.front( + self._pool, self._pool, self._pool) + self._rear_link = _rear.RearLink( + self._host, self._port, self._pool, + self._breakdown.request_serializers, + self._breakdown.response_deserializers, self._secure, + self._root_certificates, self._private_key, self._certificate_chain, + server_host_override=self._server_host_override) + self._front.join_rear_link(self._rear_link) + self._rear_link.join_fore_link(self._front) + self._rear_link.start() + self._understub = _face_implementations.dynamic_stub( + _reexport.common_cardinalities(self._breakdown.cardinalities), + self._front, self._pool, '') + else: + raise ValueError('Tried to __enter__ already-__enter__ed Stub!') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._lock: + if self._pool is None: + raise ValueError('Tried to __exit__ non-__enter__ed Stub!') + else: + self._rear_link.stop() + _base_utilities.wait_for_idle(self._front) + self._pool.shutdown(wait=True) + self._rear_link = None + self._front = None + self._pool = None + self._understub = None + return False + + def __getattr__(self, attr): + with self._lock: + if self._pool is None: + raise ValueError('Tried to __getattr__ non-__enter__ed Stub!') + else: + underlying_attr = getattr(self._understub, attr, None) + method_cardinality = self._breakdown.cardinalities.get(attr) + # TODO(nathaniel): Eliminate this trick. + if underlying_attr is None: + for method_name, method_cardinality in self._breakdown.cardinalities.iteritems(): + last_slash_index = method_name.rfind('/') + if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr: + underlying_attr = getattr(self._understub, method_name) + break + else: + raise AttributeError(attr) + if method_cardinality is interfaces.Cardinality.UNARY_UNARY: + return _reexport.unary_unary_sync_async(underlying_attr) + elif method_cardinality is interfaces.Cardinality.UNARY_STREAM: + return lambda request, timeout: _reexport.cancellable_iterator( + underlying_attr(request, timeout)) + elif method_cardinality is interfaces.Cardinality.STREAM_UNARY: + return _reexport.stream_unary_sync_async(underlying_attr) + elif method_cardinality is interfaces.Cardinality.STREAM_STREAM: + return lambda request_iterator, timeout: ( + _reexport.cancellable_iterator(underlying_attr( + request_iterator, timeout))) + else: + raise AttributeError(attr) + + +def _build_stub( + methods, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=None): + breakdown = _face_utilities.break_down_invocation(methods) + return _Stub( + breakdown, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=server_host_override) def _build_server(methods, port, private_key, certificate_chain): @@ -118,11 +223,7 @@ def insecure_stub(methods, host, port): Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _face_utilities.break_down_invocation(methods) - activated_rear_link = _rear.activated_rear_link( - host, port, breakdown.request_serializers, - breakdown.response_deserializers) - return _build_stub(breakdown, activated_rear_link) + return _build_stub(methods, host, port, False, None, None, None) def secure_stub( @@ -148,12 +249,9 @@ def secure_stub( Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _face_utilities.break_down_invocation(methods) - activated_rear_link = _rear.secure_activated_rear_link( - host, port, breakdown.request_serializers, - breakdown.response_deserializers, root_certificates, private_key, + return _build_stub( + methods, host, port, True, root_certificates, private_key, certificate_chain, server_host_override=server_host_override) - return _build_stub(breakdown, activated_rear_link) def insecure_server(methods, port): diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py deleted file mode 100644 index 7086519106..0000000000 --- a/src/python/src/grpc/framework/assembly/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py deleted file mode 100644 index 24afcbeb6d..0000000000 --- a/src/python/src/grpc/framework/assembly/implementations.py +++ /dev/null @@ -1,264 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Implementations for assembling RPC framework values.""" - -import threading - -# tickets_interfaces, face_interfaces, and activated are referenced from -# specification in this module. -from grpc.framework.assembly import interfaces -from grpc.framework.base import util as base_utilities -from grpc.framework.base.packets import implementations as tickets_implementations -from grpc.framework.base.packets import interfaces as tickets_interfaces # pylint: disable=unused-import -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.face import implementations as face_implementations -from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import -from grpc.framework.face import utilities as face_utilities -from grpc.framework.foundation import activated # pylint: disable=unused-import -from grpc.framework.foundation import logging_pool - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 -_THREAD_POOL_SIZE = 100 - - -class _FaceStub(object): - - def __init__(self, rear_link): - self._rear_link = rear_link - self._lock = threading.Lock() - self._pool = None - self._front = None - self._under_stub = None - - def __enter__(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._front = tickets_implementations.front( - self._pool, self._pool, self._pool) - self._rear_link.start() - self._rear_link.join_fore_link(self._front) - self._front.join_rear_link(self._rear_link) - self._under_stub = face_implementations.generic_stub(self._front, self._pool) - - def __exit__(self, exc_type, exc_val, exc_tb): - with self._lock: - self._under_stub = None - self._rear_link.stop() - base_utilities.wait_for_idle(self._front) - self._front = None - self._pool.shutdown(wait=True) - self._pool = None - return False - - def __getattr__(self, attr): - with self._lock: - if self._under_stub is None: - raise ValueError('Called out of context!') - else: - return getattr(self._under_stub, attr) - - -def _behaviors(method_cardinalities, front, pool): - behaviors = {} - stub = face_implementations.generic_stub(front, pool) - for name, method_cardinality in method_cardinalities.iteritems(): - if method_cardinality is cardinality.Cardinality.UNARY_UNARY: - behaviors[name] = stub.unary_unary_multi_callable(name) - elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: - behaviors[name] = lambda request, context, bound_name=name: ( - stub.inline_value_in_stream_out(bound_name, request, context)) - elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: - behaviors[name] = stub.stream_unary_multi_callable(name) - elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: - behaviors[name] = lambda request_iterator, context, bound_name=name: ( - stub.inline_stream_in_stream_out( - bound_name, request_iterator, context)) - return behaviors - - -class _DynamicInlineStub(object): - - def __init__(self, cardinalities, rear_link): - self._cardinalities = cardinalities - self._rear_link = rear_link - self._lock = threading.Lock() - self._pool = None - self._front = None - self._behaviors = None - - def __enter__(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._front = tickets_implementations.front( - self._pool, self._pool, self._pool) - self._rear_link.start() - self._rear_link.join_fore_link(self._front) - self._front.join_rear_link(self._rear_link) - self._behaviors = _behaviors( - self._cardinalities, self._front, self._pool) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - with self._lock: - self._behaviors = None - self._rear_link.stop() - base_utilities.wait_for_idle(self._front) - self._front = None - self._pool.shutdown(wait=True) - self._pool = None - return False - - def __getattr__(self, attr): - with self._lock: - behavior = self._behaviors.get(attr) - if behavior is None: - for name, behavior in self._behaviors.iteritems(): - last_slash_index = name.rfind('/') - if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: - return behavior - else: - raise AttributeError( - '_DynamicInlineStub instance has no attribute "%s"!' % attr) - else: - return behavior - - -class _ServiceAssembly(interfaces.Server): - - def __init__(self, implementations, fore_link): - self._implementations = implementations - self._fore_link = fore_link - self._lock = threading.Lock() - self._pool = None - self._back = None - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - servicer = face_implementations.servicer( - self._pool, self._implementations, None) - self._back = tickets_implementations.back( - servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, - _ONE_DAY_IN_SECONDS) - self._fore_link.start() - self._fore_link.join_rear_link(self._back) - self._back.join_fore_link(self._fore_link) - - def _stop(self): - with self._lock: - self._fore_link.stop() - base_utilities.wait_for_idle(self._back) - self._back = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - self._start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def port(self): - with self._lock: - return self._fore_link.port() - - -def assemble_face_stub(activated_rear_link): - """Assembles a face_interfaces.GenericStub. - - The returned object is a context manager and may only be used in context to - invoke RPCs. - - Args: - activated_rear_link: An object that is both a tickets_interfaces.RearLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - A face_interfaces.GenericStub on which, in context, RPCs can be invoked. - """ - return _FaceStub(activated_rear_link) - - -def assemble_dynamic_inline_stub(cardinalities, activated_rear_link): - """Assembles a stub with method names for attributes. - - The returned object is a context manager and may only be used in context to - invoke RPCs. - - The returned object, when used in context, will respond to attribute access - as follows: if the requested attribute is the name of a unary-unary RPC - method, the value of the attribute will be a - face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method. - If the requested attribute is the name of a unary-stream RPC method, the - value of the attribute will be a face_interfaces.UnaryStreamMultiCallable - with which to invoke the RPC method. If the requested attribute is the name - of a stream-unary RPC method, the value of the attribute will be a - face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method. - If the requested attribute is the name of a stream-stream RPC method, the - value of the attribute will be a face_interfaces.StreamStreamMultiCallable - with which to invoke the RPC method. - - Args: - cardinalities: A dictionary from RPC method name to cardinality.Cardinality - value identifying the cardinality of the named RPC method. - activated_rear_link: An object that is both a tickets_interfaces.RearLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - A face_interfaces.DynamicStub on which, in context, RPCs can be invoked. - """ - return _DynamicInlineStub(cardinalities, activated_rear_link) - - -def assemble_service(implementations, activated_fore_link): - """Assembles the service-side of the RPC Framework stack. - - Args: - implementations: A dictionary from RPC method name to - face_interfaces.MethodImplementation. - activated_fore_link: An object that is both a tickets_interfaces.ForeLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - An interfaces.Server encapsulating RPC service. - """ - return _ServiceAssembly(implementations, activated_fore_link) diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py deleted file mode 100644 index 5540edff7a..0000000000 --- a/src/python/src/grpc/framework/assembly/implementations_test.py +++ /dev/null @@ -1,288 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# TODO(nathaniel): Expand this test coverage. - -"""Test of the GRPC-backed ForeLink and RearLink.""" - -import threading -import unittest - -from grpc.framework.assembly import implementations -from grpc.framework.base import interfaces -from grpc.framework.base.packets import packets as tickets -from grpc.framework.base.packets import interfaces as tickets_interfaces -from grpc.framework.base.packets import null -from grpc.framework.face import utilities as face_utilities -from grpc.framework.foundation import logging_pool -from grpc._junkdrawer import math_pb2 - -DIV = 'Div' -DIV_MANY = 'DivMany' -FIB = 'Fib' -SUM = 'Sum' - -def _fibbonacci(limit): - left, right = 0, 1 - for _ in xrange(limit): - yield left - left, right = right, left + right - - -def _div(request, unused_context): - return math_pb2.DivReply( - quotient=request.dividend / request.divisor, - remainder=request.dividend % request.divisor) - - -def _div_many(request_iterator, unused_context): - for request in request_iterator: - yield math_pb2.DivReply( - quotient=request.dividend / request.divisor, - remainder=request.dividend % request.divisor) - - -def _fib(request, unused_context): - for number in _fibbonacci(request.limit): - yield math_pb2.Num(num=number) - - -def _sum(request_iterator, unused_context): - accumulation = 0 - for request in request_iterator: - accumulation += request.num - return math_pb2.Num(num=accumulation) - - -_IMPLEMENTATIONS = { - DIV: face_utilities.unary_unary_inline(_div), - DIV_MANY: face_utilities.stream_stream_inline(_div_many), - FIB: face_utilities.unary_stream_inline(_fib), - SUM: face_utilities.stream_unary_inline(_sum), -} - -_CARDINALITIES = { - name: implementation.cardinality - for name, implementation in _IMPLEMENTATIONS.iteritems()} - -_TIMEOUT = 10 - - -class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink): - - def __init__(self): - self._fore_lock = threading.Lock() - self._fore_link = null.NULL_FORE_LINK - self._rear_lock = threading.Lock() - self._rear_link = null.NULL_REAR_LINK - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - return False - - def start(self): - pass - - def stop(self): - pass - - def accept_back_to_front_ticket(self, ticket): - with self._fore_lock: - self._fore_link.accept_back_to_front_ticket(ticket) - - def join_rear_link(self, rear_link): - with self._rear_lock: - self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - - def accept_front_to_back_ticket(self, ticket): - with self._rear_lock: - self._rear_link.accept_front_to_back_ticket(ticket) - - def join_fore_link(self, fore_link): - with self._fore_lock: - self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link - - -class FaceStubTest(unittest.TestCase): - - def testUnaryUnary(self): - divisor = 7 - dividend = 13 - expected_quotient = 1 - expected_remainder = 6 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - service.start() - try: - with face_stub: - response = face_stub.blocking_value_in_value_out( - DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend), - _TIMEOUT) - self.assertEqual(expected_quotient, response.quotient) - self.assertEqual(expected_remainder, response.remainder) - finally: - service.stop() - - def testUnaryStream(self): - stream_length = 29 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - responses = list( - face_stub.inline_value_in_stream_out( - FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT)) - numbers = [response.num for response in responses] - for early, middle, later in zip(numbers, numbers[1:], numbers[2:]): - self.assertEqual(early + middle, later) - - def testStreamUnary(self): - stream_length = 13 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - multi_callable = face_stub.stream_unary_multi_callable(SUM) - response_future = multi_callable.future( - (math_pb2.Num(num=index) for index in range(stream_length)), - _TIMEOUT) - self.assertEqual( - (stream_length * (stream_length - 1)) / 2, - response_future.result().num) - - def testStreamStream(self): - stream_length = 17 - divisor_offset = 7 - dividend_offset = 17 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - response_iterator = face_stub.inline_stream_in_stream_out( - DIV_MANY, - (math_pb2.DivArgs( - divisor=divisor_offset + index, - dividend=dividend_offset + index) - for index in range(stream_length)), - _TIMEOUT) - for index, response in enumerate(response_iterator): - self.assertEqual( - (dividend_offset + index) / (divisor_offset + index), - response.quotient) - self.assertEqual( - (dividend_offset + index) % (divisor_offset + index), - response.remainder) - self.assertEqual(stream_length, index + 1) - - -class DynamicInlineStubTest(unittest.TestCase): - - def testUnaryUnary(self): - divisor = 59 - dividend = 973 - expected_quotient = dividend / divisor - expected_remainder = dividend % divisor - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - service.start() - with dynamic_stub: - response = dynamic_stub.Div( - math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT) - self.assertEqual(expected_quotient, response.quotient) - self.assertEqual(expected_remainder, response.remainder) - service.stop() - - def testUnaryStream(self): - stream_length = 43 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_iterator = dynamic_stub.Fib( - math_pb2.FibArgs(limit=stream_length), _TIMEOUT) - numbers = tuple(response.num for response in response_iterator) - for early, middle, later in zip(numbers, numbers[:1], numbers[:2]): - self.assertEqual(early + middle, later) - self.assertEqual(stream_length, len(numbers)) - - def testStreamUnary(self): - stream_length = 127 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_future = dynamic_stub.Sum.future( - (math_pb2.Num(num=index) for index in range(stream_length)), - _TIMEOUT) - self.assertEqual( - (stream_length * (stream_length - 1)) / 2, - response_future.result().num) - - def testStreamStream(self): - stream_length = 179 - divisor_offset = 71 - dividend_offset = 1763 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_iterator = dynamic_stub.DivMany( - (math_pb2.DivArgs( - divisor=divisor_offset + index, - dividend=dividend_offset + index) - for index in range(stream_length)), - _TIMEOUT) - for index, response in enumerate(response_iterator): - self.assertEqual( - (dividend_offset + index) / (divisor_offset + index), - response.quotient) - self.assertEqual( - (dividend_offset + index) % (divisor_offset + index), - response.remainder) - self.assertEqual(stream_length, index + 1) - - -if __name__ == '__main__': - unittest.main() diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py deleted file mode 100644 index c469dc4fd2..0000000000 --- a/src/python/src/grpc/framework/assembly/interfaces.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in -# the face layer. The two should be squashed together as soon as manageable. -"""Interfaces for assembling RPC Framework values.""" - -import abc - -from grpc.framework.foundation import activated - - -class Server(activated.Activated): - """The server interface. - - Aside from being able to be activated and deactivated, objects of this type - are able to report the port on which they are servicing RPCs. - """ - __metaclass__ = abc.ABCMeta - - # TODO(issue 726): This is an abstraction violation; not every Server is - # necessarily serving over a network at all. - @abc.abstractmethod - def port(self): - """Identifies the port on which this Server is servicing RPCs. - - This method may only be called while the server is active. - - Returns: - The number of the port on which this Server is servicing RPCs. - """ - raise NotImplementedError() diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py index e8d91a3c91..4a6de52974 100644 --- a/src/python/src/grpc/framework/face/implementations.py +++ b/src/python/src/grpc/framework/face/implementations.py @@ -213,14 +213,14 @@ class _DynamicStub(interfaces.DynamicStub): self._pool = pool def __getattr__(self, attr): - cardinality = self._cardinalities.get(attr) - if cardinality is cardinality.Cardinality.UNARY_UNARY: + method_cardinality = self._cardinalities.get(attr) + if method_cardinality is cardinality.Cardinality.UNARY_UNARY: return _UnaryUnaryMultiCallable(self._front, attr) - elif cardinality is cardinality.Cardinality.UNARY_STREAM: + elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: return _UnaryStreamMultiCallable(self._front, attr) - elif cardinality is cardinality.Cardinality.STREAM_UNARY: + elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: return _StreamUnaryMultiCallable(self._front, attr, self._pool) - elif cardinality is cardinality.Cardinality.STREAM_STREAM: + elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: return _StreamStreamMultiCallable(self._front, attr, self._pool) else: raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr) @@ -315,4 +315,4 @@ def dynamic_stub(cardinalities, front, pool, prefix): An interfaces.DynamicStub that performs RPCs via the given base_interfaces.Front. """ - return _DynamicStub(cardinalities, front, pool, prefix) + return _DynamicStub(cardinalities, front, pool) diff --git a/src/python/src/setup.py b/src/python/src/setup.py index cdb82a9dc3..a513a2811b 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -64,7 +64,6 @@ _PACKAGES = ( 'grpc._junkdrawer', 'grpc.early_adopter', 'grpc.framework', - 'grpc.framework.assembly', 'grpc.framework.base', 'grpc.framework.base.packets', 'grpc.framework.common', |