aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/python_generator.cc16
-rw-r--r--src/python/grpcio/grpc/beta/_connectivity_channel.py16
-rw-r--r--src/python/grpcio/grpc/beta/_server.py6
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py (renamed from src/python/grpcio/grpc/beta/beta.py)142
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py102
-rw-r--r--src/python/grpcio/grpc/beta/utilities.py21
-rw-r--r--src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py4
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_beta_features_test.py17
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py59
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_face_interface_test.py19
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_not_found_test.py8
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_utilities_test.py6
-rw-r--r--src/python/grpcio_test/grpc_test/beta/test_utilities.py12
13 files changed, 216 insertions, 212 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index fe2b9fad99..83133f2b6e 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -339,7 +339,7 @@ bool PrintAlphaServerFactory(const grpc::string& package_qualified_service_name,
}
out->Print("}\n");
out->Print(
- "return implementations.server("
+ "return early_adopter_implementations.server("
"\"$PackageQualifiedServiceName$\","
" method_service_descriptions, port, private_key=private_key,"
" certificate_chain=certificate_chain)\n",
@@ -422,7 +422,7 @@ bool PrintAlphaStubFactory(const grpc::string& package_qualified_service_name,
}
out->Print("}\n");
out->Print(
- "return implementations.stub("
+ "return early_adopter_implementations.stub("
"\"$PackageQualifiedServiceName$\","
" method_invocation_descriptions, host, port,"
" metadata_transformer=metadata_transformer, secure=secure,"
@@ -586,13 +586,13 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
"Constructor", name_and_implementation_constructor->second);
}
out->Print("}\n");
- out->Print("server_options = beta.server_options("
+ out->Print("server_options = beta_implementations.server_options("
"request_deserializers=request_deserializers, "
"response_serializers=response_serializers, "
"thread_pool=pool, thread_pool_size=pool_size, "
"default_timeout=default_timeout, "
"maximum_timeout=maximum_timeout)\n");
- out->Print("return beta.server(method_implementations, "
+ out->Print("return beta_implementations.server(method_implementations, "
"options=server_options)\n");
}
return true;
@@ -685,13 +685,13 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
"Cardinality", name_and_cardinality->second);
}
out->Print("}\n");
- out->Print("stub_options = beta.stub_options("
+ out->Print("stub_options = beta_implementations.stub_options("
"host=host, metadata_transformer=metadata_transformer, "
"request_serializers=request_serializers, "
"response_deserializers=response_deserializers, "
"thread_pool=pool, thread_pool_size=pool_size)\n");
out->Print(
- "return beta.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', "
+ "return beta_implementations.dynamic_stub(channel, \'$PackageQualifiedServiceName$\', "
"cardinalities, options=stub_options)\n",
"PackageQualifiedServiceName", package_qualified_service_name);
}
@@ -701,9 +701,9 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
bool PrintPreamble(const FileDescriptor* file,
const GeneratorConfiguration& config, Printer* out) {
out->Print("import abc\n");
- out->Print("from $Package$ import beta\n",
+ out->Print("from $Package$ import implementations as beta_implementations\n",
"Package", config.beta_package_root);
- out->Print("from $Package$ import implementations\n",
+ out->Print("from $Package$ import implementations as early_adopter_implementations\n",
"Package", config.early_adopter_package_root);
out->Print("from grpc.framework.alpha import utilities as alpha_utilities\n");
out->Print("from grpc.framework.common import cardinality\n");
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
index 457ede79f2..61674a70ad 100644
--- a/src/python/grpcio/grpc/beta/_connectivity_channel.py
+++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py
@@ -33,18 +33,24 @@ import threading
import time
from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import interfaces
from grpc.framework.foundation import callable_util
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
+_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
+ state: connectivity for state, connectivity in zip(
+ _types.ConnectivityState, interfaces.ChannelConnectivity)
+}
+
class ConnectivityChannel(object):
- def __init__(self, low_channel, mapping):
+ def __init__(self, low_channel):
self._lock = threading.Lock()
self._low_channel = low_channel
- self._mapping = mapping
self._polling = False
self._connectivity = None
@@ -88,7 +94,8 @@ class ConnectivityChannel(object):
try_to_connect = initial_try_to_connect
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
- self._connectivity = self._mapping[low_connectivity]
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
callbacks = tuple(
callback for callback, unused_but_known_to_be_none_connectivity
in self._callbacks_and_connectivities)
@@ -112,7 +119,8 @@ class ConnectivityChannel(object):
if event.success or try_to_connect:
low_connectivity = low_channel.check_connectivity_state(try_to_connect)
with self._lock:
- self._connectivity = self._mapping[low_connectivity]
+ self._connectivity = _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
+ low_connectivity]
if not self._delivering:
callbacks = self._deliveries(self._connectivity)
if callbacks:
diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py
index ebf91d80ab..daa42c475a 100644
--- a/src/python/grpcio/grpc/beta/_server.py
+++ b/src/python/grpcio/grpc/beta/_server.py
@@ -72,7 +72,7 @@ def _disassemble(grpc_link, end_link, pool, event, grace):
event.set()
-class Server(object):
+class Server(interfaces.Server):
def __init__(self, grpc_link, end_link, pool):
self._grpc_link = grpc_link
@@ -82,9 +82,9 @@ class Server(object):
def add_insecure_port(self, address):
return self._grpc_link.add_port(address, None)
- def add_secure_port(self, address, intermediary_low_server_credentials):
+ def add_secure_port(self, address, server_credentials):
return self._grpc_link.add_port(
- address, intermediary_low_server_credentials)
+ address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
def start(self):
self._grpc_link.join_link(self._end_link)
diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/implementations.py
index b3a161087f..9b461fb3dd 100644
--- a/src/python/grpcio/grpc/beta/beta.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -40,6 +40,7 @@ from grpc._adapter import _types
from grpc.beta import _connectivity_channel
from grpc.beta import _server
from grpc.beta import _stub
+from grpc.beta import interfaces
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
@@ -47,32 +48,6 @@ _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
'Exception calling channel subscription callback!')
-@enum.unique
-class ChannelConnectivity(enum.Enum):
- """Mirrors grpc_connectivity_state in the gRPC Core.
-
- Attributes:
- IDLE: The channel is idle.
- CONNECTING: The channel is connecting.
- READY: The channel is ready to conduct RPCs.
- TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
- recover.
- FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
- """
-
- IDLE = (_types.ConnectivityState.IDLE, 'idle',)
- CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
- READY = (_types.ConnectivityState.READY, 'ready',)
- TRANSIENT_FAILURE = (
- _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
- FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
-
-_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
- state: connectivity for state, connectivity in zip(
- _types.ConnectivityState, ChannelConnectivity)
-}
-
-
class ClientCredentials(object):
"""A value encapsulating the data required to create a secure Channel.
@@ -118,13 +93,14 @@ class Channel(object):
self._low_channel = low_channel
self._intermediary_low_channel = intermediary_low_channel
self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
+ low_channel)
def subscribe(self, callback, try_to_connect=None):
"""Subscribes to this Channel's connectivity.
Args:
- callback: A callable to be invoked and passed this Channel's connectivity.
+ callback: A callable to be invoked and passed an
+ interfaces.ChannelConnectivity identifying this Channel's connectivity.
The callable will be invoked immediately upon subscription and again for
every change to this Channel's connectivity thereafter until it is
unsubscribed.
@@ -144,7 +120,7 @@ class Channel(object):
self._connectivity_channel.unsubscribe(callback)
-def create_insecure_channel(host, port):
+def insecure_channel(host, port):
"""Creates an insecure Channel to a remote host.
Args:
@@ -159,7 +135,7 @@ def create_insecure_channel(host, port):
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
-def create_secure_channel(host, port, client_credentials):
+def secure_channel(host, port, client_credentials):
"""Creates a secure Channel to a remote host.
Args:
@@ -313,86 +289,6 @@ def ssl_server_credentials(
intermediary_low_credentials._internal, intermediary_low_credentials) # pylint: disable=protected-access
-class Server(object):
- """Services RPCs."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def add_insecure_port(self, address):
- """Reserves a port for insecure RPC service once this Server becomes active.
-
- This method may only be called before calling this Server's start method is
- called.
-
- Args:
- address: The address for which to open a port.
-
- Returns:
- An integer port on which RPCs will be serviced after this link has been
- started. This is typically the same number as the port number contained
- in the passed address, but will likely be different if the port number
- contained in the passed address was zero.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def add_secure_port(self, address, server_credentials):
- """Reserves a port for secure RPC service after this Server becomes active.
-
- This method may only be called before calling this Server's start method is
- called.
-
- Args:
- address: The address for which to open a port.
- server_credentials: A ServerCredentials.
-
- Returns:
- An integer port on which RPCs will be serviced after this link has been
- started. This is typically the same number as the port number contained
- in the passed address, but will likely be different if the port number
- contained in the passed address was zero.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def start(self):
- """Starts this Server's service of RPCs.
-
- This method may only be called while the server is not serving RPCs (i.e. it
- is not idempotent).
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stop(self, grace):
- """Stops this Server's service of RPCs.
-
- All calls to this method immediately stop service of new RPCs. When existing
- RPCs are aborted is controlled by the grace period parameter passed to this
- method.
-
- This method may be called at any time and is idempotent. Passing a smaller
- grace value than has been passed in a previous call will have the effect of
- stopping the Server sooner. Passing a larger grace value than has been
- passed in a previous call will not have the effect of stopping the sooner
- later.
-
- Args:
- grace: A duration of time in seconds to allow existing RPCs to complete
- before being aborted by this Server's stopping. May be zero for
- immediate abortion of all in-progress RPCs.
-
- Returns:
- A threading.Event that will be set when this Server has completely
- stopped. The returned event may not be set until after the full grace
- period (if some ongoing RPC continues for the full length of the period)
- of it may be set much sooner (such as if this Server had no RPCs underway
- at the time it was stopped or if all RPCs that it had underway completed
- very early in the grace period).
- """
- raise NotImplementedError()
-
-
class ServerOptions(object):
"""A value encapsulating the various options for creation of a Server.
@@ -450,27 +346,8 @@ def server_options(
thread_pool, thread_pool_size, default_timeout, maximum_timeout)
-class _Server(Server):
-
- def __init__(self, underserver):
- self._underserver = underserver
-
- def add_insecure_port(self, address):
- return self._underserver.add_insecure_port(address)
-
- def add_secure_port(self, address, server_credentials):
- return self._underserver.add_secure_port(
- address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access
-
- def start(self):
- self._underserver.start()
-
- def stop(self, grace):
- return self._underserver.stop(grace)
-
-
def server(service_implementations, options=None):
- """Creates a Server with which RPCs can be serviced.
+ """Creates an interfaces.Server with which RPCs can be serviced.
Args:
service_implementations: A dictionary from service name-method name pair to
@@ -479,13 +356,12 @@ def server(service_implementations, options=None):
functionality of the returned Server.
Returns:
- A Server with which RPCs can be serviced.
+ An interfaces.Server with which RPCs can be serviced.
"""
effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
- underserver = _server.server(
+ return _server.server(
service_implementations, effective_options.multi_method_implementation,
effective_options.request_deserializers,
effective_options.response_serializers, effective_options.thread_pool,
effective_options.thread_pool_size, effective_options.default_timeout,
effective_options.maximum_timeout)
- return _Server(underserver)
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 79f2620dd4..07c8618f70 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -32,6 +32,28 @@
import abc
import enum
+from grpc._adapter import _types
+
+
+@enum.unique
+class ChannelConnectivity(enum.Enum):
+ """Mirrors grpc_connectivity_state in the gRPC Core.
+
+ Attributes:
+ IDLE: The channel is idle.
+ CONNECTING: The channel is connecting.
+ READY: The channel is ready to conduct RPCs.
+ TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
+ recover.
+ FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+ """
+ IDLE = (_types.ConnectivityState.IDLE, 'idle',)
+ CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
+ READY = (_types.ConnectivityState.READY, 'ready',)
+ TRANSIENT_FAILURE = (
+ _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
+ FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
+
@enum.unique
class StatusCode(enum.Enum):
@@ -110,3 +132,83 @@ class GRPCInvocationContext(object):
def disable_next_request_compression(self):
"""Disables compression of the next request passed by the application."""
raise NotImplementedError()
+
+
+class Server(object):
+ """Services RPCs."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def add_insecure_port(self, address):
+ """Reserves a port for insecure RPC service once this Server becomes active.
+
+ This method may only be called before calling this Server's start method is
+ called.
+
+ Args:
+ address: The address for which to open a port.
+
+ Returns:
+ An integer port on which RPCs will be serviced after this link has been
+ started. This is typically the same number as the port number contained
+ in the passed address, but will likely be different if the port number
+ contained in the passed address was zero.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_secure_port(self, address, server_credentials):
+ """Reserves a port for secure RPC service after this Server becomes active.
+
+ This method may only be called before calling this Server's start method is
+ called.
+
+ Args:
+ address: The address for which to open a port.
+ server_credentials: A ServerCredentials.
+
+ Returns:
+ An integer port on which RPCs will be serviced after this link has been
+ started. This is typically the same number as the port number contained
+ in the passed address, but will likely be different if the port number
+ contained in the passed address was zero.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start(self):
+ """Starts this Server's service of RPCs.
+
+ This method may only be called while the server is not serving RPCs (i.e. it
+ is not idempotent).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stop(self, grace):
+ """Stops this Server's service of RPCs.
+
+ All calls to this method immediately stop service of new RPCs. When existing
+ RPCs are aborted is controlled by the grace period parameter passed to this
+ method.
+
+ This method may be called at any time and is idempotent. Passing a smaller
+ grace value than has been passed in a previous call will have the effect of
+ stopping the Server sooner. Passing a larger grace value than has been
+ passed in a previous call will not have the effect of stopping the sooner
+ later.
+
+ Args:
+ grace: A duration of time in seconds to allow existing RPCs to complete
+ before being aborted by this Server's stopping. May be zero for
+ immediate abortion of all in-progress RPCs.
+
+ Returns:
+ A threading.Event that will be set when this Server has completely
+ stopped. The returned event may not be set until after the full grace
+ period (if some ongoing RPC continues for the full length of the period)
+ of it may be set much sooner (such as if this Server had no RPCs underway
+ at the time it was stopped or if all RPCs that it had underway completed
+ very early in the grace period).
+ """
+ raise NotImplementedError()
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py
index 1b5356e3ad..fb07a76579 100644
--- a/src/python/grpcio/grpc/beta/utilities.py
+++ b/src/python/grpcio/grpc/beta/utilities.py
@@ -32,7 +32,9 @@
import threading
import time
-from grpc.beta import beta
+# implementations is referenced from specification in this module.
+from grpc.beta import implementations # pylint: disable=unused-import
+from grpc.beta import interfaces
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import future
@@ -70,7 +72,8 @@ class _ChannelReadyFuture(future.Future):
def _update(self, connectivity):
with self._condition:
- if not self._cancelled and connectivity is beta.ChannelConnectivity.READY:
+ if (not self._cancelled and
+ connectivity is interfaces.ChannelConnectivity.READY):
self._matured = True
self._channel.unsubscribe(self._update)
self._condition.notify_all()
@@ -141,19 +144,19 @@ class _ChannelReadyFuture(future.Future):
def channel_ready_future(channel):
- """Creates a future.Future that matures when a beta.Channel is ready.
+ """Creates a future.Future tracking when an implementations.Channel is ready.
- Cancelling the returned future.Future does not tell the given beta.Channel to
- abandon attempts it may have been making to connect; cancelling merely
- deactivates the return future.Future's subscription to the given
- beta.Channel's connectivity.
+ Cancelling the returned future.Future does not tell the given
+ implementations.Channel to abandon attempts it may have been making to
+ connect; cancelling merely deactivates the return future.Future's
+ subscription to the given implementations.Channel's connectivity.
Args:
- channel: A beta.Channel.
+ channel: An implementations.Channel.
Returns:
A future.Future that matures when the given Channel has connectivity
- beta.ChannelConnectivity.READY.
+ interfaces.ChannelConnectivity.READY.
"""
ready_future = _ChannelReadyFuture(channel)
ready_future.start()
diff --git a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
index 4c8c64b06d..259b978de2 100644
--- a/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio_test/grpc_protoc_plugin/beta_python_plugin_test.py
@@ -42,7 +42,7 @@ import threading
import time
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
@@ -170,7 +170,7 @@ def _CreateService(test_pb2):
server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
- channel = beta.create_insecure_channel('localhost', port)
+ channel = implementations.insecure_channel('localhost', port)
stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
yield servicer_methods, stub
server.stop(0)
diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
index 89fe4b2acf..fad57da9d0 100644
--- a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py
@@ -32,7 +32,7 @@
import threading
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities
@@ -159,20 +159,21 @@ class BetaFeaturesTest(unittest.TestCase):
_STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
}
- server_options = beta.server_options(
+ server_options = implementations.server_options(
thread_pool_size=test_constants.POOL_SIZE)
- self._server = beta.server(method_implementations, options=server_options)
- server_credentials = beta.ssl_server_credentials(
+ self._server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
port = self._server.add_secure_port('[::]:0', server_credentials)
self._server.start()
- self._client_credentials = beta.ssl_client_credentials(
+ self._client_credentials = implementations.ssl_client_credentials(
resources.test_root_certificates(), None, None)
- channel = test_utilities.create_not_really_secure_channel(
+ channel = test_utilities.not_really_secure_channel(
'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = beta.stub_options(
+ stub_options = implementations.stub_options(
thread_pool_size=test_constants.POOL_SIZE)
- self._dynamic_stub = beta.dynamic_stub(
+ self._dynamic_stub = implementations.dynamic_stub(
channel, _GROUP, cardinalities, options=stub_options)
def tearDown(self):
diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
index 038464889d..b3c05bdb0c 100644
--- a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
@@ -36,14 +36,9 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
from grpc.beta import _connectivity_channel
+from grpc.beta import interfaces
from grpc_test.framework.common import test_constants
-_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
-_MAPPING = {
- state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
-_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
- _MAPPING_FUNCTION, _types.ConnectivityState)
-
def _drive_completion_queue(completion_queue):
while True:
@@ -84,7 +79,7 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=False)
first_connectivities = callback.block_until_connectivities_satisfy(bool)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
@@ -98,11 +93,16 @@ class ChannelConnectivityTest(unittest.TestCase):
connectivity_channel.unsubscribe(callback.update)
fifth_connectivities = callback.connectivities()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertNotIn(_READY, second_connectivities)
- self.assertNotIn(_READY, third_connectivities)
- self.assertNotIn(_READY, fourth_connectivities)
- self.assertNotIn(_READY, fifth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.READY, fifth_connectivities)
def test_immediately_connectable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -117,7 +117,7 @@ class ChannelConnectivityTest(unittest.TestCase):
second_callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
first_connectivities = first_callback.block_until_connectivities_satisfy(
bool)
@@ -132,9 +132,11 @@ class ChannelConnectivityTest(unittest.TestCase):
bool)
# Wait for a connection that will happen (or may already have happened).
first_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
second_callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
connectivity_channel.unsubscribe(first_callback.update)
connectivity_channel.unsubscribe(second_callback.update)
@@ -142,12 +144,19 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue.shutdown()
server_completion_queue_thread.join()
- self.assertSequenceEqual((_IDLE,), first_connectivities)
- self.assertSequenceEqual((_IDLE,), second_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
- self.assertNotIn(_FATAL_FAILURE, third_connectivities)
- self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
- self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), first_connectivities)
+ self.assertSequenceEqual(
+ (interfaces.ChannelConnectivity.IDLE,), second_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.TRANSIENT_FAILURE,
+ fourth_connectivities)
+ self.assertNotIn(
+ interfaces.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server_completion_queue = _low.CompletionQueue()
@@ -161,14 +170,16 @@ class ChannelConnectivityTest(unittest.TestCase):
callback = _Callback()
connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel, _MAPPING)
+ low_channel)
connectivity_channel.subscribe(callback.update, try_to_connect=True)
callback.block_until_connectivities_satisfy(
- lambda connectivities: _READY in connectivities)
+ lambda connectivities:
+ interfaces.ChannelConnectivity.READY in connectivities)
# Now take down the server and confirm that channel readiness is repudiated.
server.shutdown()
callback.block_until_connectivities_satisfy(
- lambda connectivities: connectivities[-1] is not _READY)
+ lambda connectivities:
+ connectivities[-1] is not interfaces.ChannelConnectivity.READY)
connectivity_channel.unsubscribe(callback.update)
server.shutdown()
diff --git a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
index e9087a7949..aa33e1e6f8 100644
--- a/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_face_interface_test.py
@@ -32,7 +32,7 @@
import collections
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import interfaces
from grpc_test import resources
from grpc_test import test_common as grpc_test_common
@@ -81,25 +81,26 @@ class _Implementation(test_interfaces.Implementation):
method: method_object.cardinality()
for (group, method), method_object in methods.iteritems()}
- server_options = beta.server_options(
+ server_options = implementations.server_options(
request_deserializers=serialization_behaviors.request_deserializers,
response_serializers=serialization_behaviors.response_serializers,
thread_pool_size=test_constants.POOL_SIZE)
- server = beta.server(method_implementations, options=server_options)
- server_credentials = beta.ssl_server_credentials(
+ server = implementations.server(
+ method_implementations, options=server_options)
+ server_credentials = implementations.ssl_server_credentials(
[(resources.private_key(), resources.certificate_chain(),),])
port = server.add_secure_port('[::]:0', server_credentials)
server.start()
- client_credentials = beta.ssl_client_credentials(
+ client_credentials = implementations.ssl_client_credentials(
resources.test_root_certificates(), None, None)
- channel = test_utilities.create_not_really_secure_channel(
+ channel = test_utilities.not_really_secure_channel(
'localhost', port, client_credentials, _SERVER_HOST_OVERRIDE)
- stub_options = beta.stub_options(
+ stub_options = implementations.stub_options(
request_serializers=serialization_behaviors.request_serializers,
response_deserializers=serialization_behaviors.response_deserializers,
thread_pool_size=test_constants.POOL_SIZE)
- generic_stub = beta.generic_stub(channel, options=stub_options)
- dynamic_stub = beta.dynamic_stub(
+ generic_stub = implementations.generic_stub(channel, options=stub_options)
+ dynamic_stub = implementations.dynamic_stub(
channel, service, cardinalities, options=stub_options)
return generic_stub, {service: dynamic_stub}, server
diff --git a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
index ecd10f2175..5feb997fef 100644
--- a/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_not_found_test.py
@@ -31,7 +31,7 @@
import unittest
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import interfaces
from grpc.framework.interfaces.face import face
from grpc_test.framework.common import test_constants
@@ -40,10 +40,10 @@ from grpc_test.framework.common import test_constants
class NotFoundTest(unittest.TestCase):
def setUp(self):
- self._server = beta.server({})
+ self._server = implementations.server({})
port = self._server.add_insecure_port('[::]:0')
- channel = beta.create_insecure_channel('localhost', port)
- self._generic_stub = beta.generic_stub(channel)
+ channel = implementations.insecure_channel('localhost', port)
+ self._generic_stub = implementations.generic_stub(channel)
self._server.start()
def tearDown(self):
diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
index 998e74ccf4..996cea9118 100644
--- a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
+++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
@@ -35,7 +35,7 @@ import unittest
from grpc._adapter import _low
from grpc._adapter import _types
-from grpc.beta import beta
+from grpc.beta import implementations
from grpc.beta import utilities
from grpc.framework.foundation import future
from grpc_test.framework.common import test_constants
@@ -69,7 +69,7 @@ class _Callback(object):
class ChannelConnectivityTest(unittest.TestCase):
def test_lonely_channel_connectivity(self):
- channel = beta.create_insecure_channel('localhost', 12345)
+ channel = implementations.insecure_channel('localhost', 12345)
callback = _Callback()
ready_future = utilities.channel_ready_future(channel)
@@ -94,7 +94,7 @@ class ChannelConnectivityTest(unittest.TestCase):
server_completion_queue_thread = threading.Thread(
target=_drive_completion_queue, args=(server_completion_queue,))
server_completion_queue_thread.start()
- channel = beta.create_insecure_channel('localhost', port)
+ channel = implementations.insecure_channel('localhost', port)
callback = _Callback()
try:
diff --git a/src/python/grpcio_test/grpc_test/beta/test_utilities.py b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
index 338670478d..24a8600e12 100644
--- a/src/python/grpcio_test/grpc_test/beta/test_utilities.py
+++ b/src/python/grpcio_test/grpc_test/beta/test_utilities.py
@@ -30,25 +30,27 @@
"""Test-appropriate entry points into the gRPC Python Beta API."""
from grpc._adapter import _intermediary_low
-from grpc.beta import beta
+from grpc.beta import implementations
-def create_not_really_secure_channel(
+def not_really_secure_channel(
host, port, client_credentials, server_host_override):
"""Creates an insecure Channel to a remote host.
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
- client_credentials: The beta.ClientCredentials with which to connect.
+ client_credentials: The implementations.ClientCredentials with which to
+ connect.
server_host_override: The target name used for SSL host name checking.
Returns:
- A beta.Channel to the remote host through which RPCs may be conducted.
+ An implementations.Channel to the remote host through which RPCs may be
+ conducted.
"""
hostport = '%s:%d' % (host, port)
intermediary_low_channel = _intermediary_low.Channel(
hostport, client_credentials._intermediary_low_credentials,
server_host_override=server_host_override)
- return beta.Channel(
+ return implementations.Channel(
intermediary_low_channel._internal, intermediary_low_channel)