aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-08-27 12:33:20 -0400
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-08-27 12:33:20 -0400
commit41d2b291eb86bec036b2a9a55e4369c55d5f1177 (patch)
tree284eb807566b2e6774ed6ae1bdc626b55ab8d97d
parent08a4cf361f0807887346d92d68d86ae86c28cfa7 (diff)
parent8e3dc00d93232b8cc4f3760fa252320fe1ded860 (diff)
Merge pull request #3093 from nathanielmanistaatgoogle/channel
The Beta API Channel
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/channel.c2
-rw-r--r--src/python/grpcio/grpc/beta/__init__.py28
-rw-r--r--src/python/grpcio/grpc/beta/_connectivity_channel.py148
-rw-r--r--src/python/grpcio/grpc/beta/beta.py114
-rw-r--r--src/python/grpcio/grpc/beta/utilities.py161
-rw-r--r--src/python/grpcio_test/grpc_test/beta/__init__.py30
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py180
-rw-r--r--src/python/grpcio_test/grpc_test/beta/_utilities_test.py123
8 files changed, 785 insertions, 1 deletions
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
index c577ac05eb..cf866dd80c 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
@@ -164,7 +164,7 @@ PyObject *pygrpc_Channel_watch_connectivity_state(
int last_observed_state;
CompletionQueue *completion_queue;
char *keywords[] = {"last_observed_state", "deadline",
- "completion_queue", "tag"};
+ "completion_queue", "tag", NULL};
if (!PyArg_ParseTupleAndKeywords(
args, kwargs, "idO!O:watch_connectivity_state", keywords,
&last_observed_state, &deadline, &pygrpc_CompletionQueue_type,
diff --git a/src/python/grpcio/grpc/beta/__init__.py b/src/python/grpcio/grpc/beta/__init__.py
new file mode 100644
index 0000000000..b89398809f
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/grpc/beta/_connectivity_channel.py b/src/python/grpcio/grpc/beta/_connectivity_channel.py
new file mode 100644
index 0000000000..457ede79f2
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_connectivity_channel.py
@@ -0,0 +1,148 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Affords a connectivity-state-listenable channel."""
+
+import threading
+import time
+
+from grpc._adapter import _low
+from grpc.framework.foundation import callable_util
+
+_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
+ 'Exception calling channel subscription callback!')
+
+
+class ConnectivityChannel(object):
+
+ def __init__(self, low_channel, mapping):
+ self._lock = threading.Lock()
+ self._low_channel = low_channel
+ self._mapping = mapping
+
+ self._polling = False
+ self._connectivity = None
+ self._try_to_connect = False
+ self._callbacks_and_connectivities = []
+ self._delivering = False
+
+ def _deliveries(self, connectivity):
+ callbacks_needing_update = []
+ for callback_and_connectivity in self._callbacks_and_connectivities:
+ callback, callback_connectivity = callback_and_connectivity
+ if callback_connectivity is not connectivity:
+ callbacks_needing_update.append(callback)
+ callback_and_connectivity[1] = connectivity
+ return callbacks_needing_update
+
+ def _deliver(self, initial_connectivity, initial_callbacks):
+ connectivity = initial_connectivity
+ callbacks = initial_callbacks
+ while True:
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(
+ callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE,
+ connectivity)
+ with self._lock:
+ callbacks = self._deliveries(self._connectivity)
+ if callbacks:
+ connectivity = self._connectivity
+ else:
+ self._delivering = False
+ return
+
+ def _spawn_delivery(self, connectivity, callbacks):
+ delivering_thread = threading.Thread(
+ target=self._deliver, args=(connectivity, callbacks,))
+ delivering_thread.start()
+ self._delivering = True
+
+ # TODO(issue 3064): Don't poll.
+ def _poll_connectivity(self, low_channel, initial_try_to_connect):
+ try_to_connect = initial_try_to_connect
+ low_connectivity = low_channel.check_connectivity_state(try_to_connect)
+ with self._lock:
+ self._connectivity = self._mapping[low_connectivity]
+ callbacks = tuple(
+ callback for callback, unused_but_known_to_be_none_connectivity
+ in self._callbacks_and_connectivities)
+ for callback_and_connectivity in self._callbacks_and_connectivities:
+ callback_and_connectivity[1] = self._connectivity
+ if callbacks:
+ self._spawn_delivery(self._connectivity, callbacks)
+ completion_queue = _low.CompletionQueue()
+ while True:
+ low_channel.watch_connectivity_state(
+ low_connectivity, time.time() + 0.2, completion_queue, None)
+ event = completion_queue.next()
+ with self._lock:
+ if not self._callbacks_and_connectivities and not self._try_to_connect:
+ self._polling = False
+ self._connectivity = None
+ completion_queue.shutdown()
+ break
+ try_to_connect = self._try_to_connect
+ self._try_to_connect = False
+ if event.success or try_to_connect:
+ low_connectivity = low_channel.check_connectivity_state(try_to_connect)
+ with self._lock:
+ self._connectivity = self._mapping[low_connectivity]
+ if not self._delivering:
+ callbacks = self._deliveries(self._connectivity)
+ if callbacks:
+ self._spawn_delivery(self._connectivity, callbacks)
+
+ def subscribe(self, callback, try_to_connect):
+ with self._lock:
+ if not self._callbacks_and_connectivities and not self._polling:
+ polling_thread = threading.Thread(
+ target=self._poll_connectivity,
+ args=(self._low_channel, bool(try_to_connect)))
+ polling_thread.start()
+ self._polling = True
+ self._callbacks_and_connectivities.append([callback, None])
+ elif not self._delivering and self._connectivity is not None:
+ self._spawn_delivery(self._connectivity, (callback,))
+ self._try_to_connect |= bool(try_to_connect)
+ self._callbacks_and_connectivities.append(
+ [callback, self._connectivity])
+ else:
+ self._try_to_connect |= bool(try_to_connect)
+ self._callbacks_and_connectivities.append([callback, None])
+
+ def unsubscribe(self, callback):
+ with self._lock:
+ for index, (subscribed_callback, unused_connectivity) in enumerate(
+ self._callbacks_and_connectivities):
+ if callback == subscribed_callback:
+ self._callbacks_and_connectivities.pop(index)
+ break
+
+ def low_channel(self):
+ return self._low_channel
diff --git a/src/python/grpcio/grpc/beta/beta.py b/src/python/grpcio/grpc/beta/beta.py
new file mode 100644
index 0000000000..40cad5e486
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/beta.py
@@ -0,0 +1,114 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry points into gRPC Python Beta."""
+
+import enum
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import _connectivity_channel
+
+_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
+ 'Exception calling channel subscription callback!')
+
+
+@enum.unique
+class ChannelConnectivity(enum.Enum):
+ """Mirrors grpc_connectivity_state in the gRPC Core.
+
+ Attributes:
+ IDLE: The channel is idle.
+ CONNECTING: The channel is connecting.
+ READY: The channel is ready to conduct RPCs.
+ TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
+ recover.
+ FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
+ """
+
+ IDLE = (_types.ConnectivityState.IDLE, 'idle',)
+ CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
+ READY = (_types.ConnectivityState.READY, 'ready',)
+ TRANSIENT_FAILURE = (
+ _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
+ FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
+
+_LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY = {
+ state: connectivity for state, connectivity in zip(
+ _types.ConnectivityState, ChannelConnectivity)
+}
+
+
+class Channel(object):
+ """A channel to a remote host through which RPCs may be conducted.
+
+ Only the "subscribe" and "unsubscribe" methods are supported for application
+ use. This class' instance constructor and all other attributes are
+ unsupported.
+ """
+
+ def __init__(self, low_channel):
+ self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
+ low_channel, _LOW_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY)
+
+ def subscribe(self, callback, try_to_connect=None):
+ """Subscribes to this Channel's connectivity.
+
+ Args:
+ callback: A callable to be invoked and passed this Channel's connectivity.
+ The callable will be invoked immediately upon subscription and again for
+ every change to this Channel's connectivity thereafter until it is
+ unsubscribed.
+ try_to_connect: A boolean indicating whether or not this Channel should
+ attempt to connect if it is not already connected and ready to conduct
+ RPCs.
+ """
+ self._connectivity_channel.subscribe(callback, try_to_connect)
+
+ def unsubscribe(self, callback):
+ """Unsubscribes a callback from this Channel's connectivity.
+
+ Args:
+ callback: A callable previously registered with this Channel from having
+ been passed to its "subscribe" method.
+ """
+ self._connectivity_channel.unsubscribe(callback)
+
+
+def create_insecure_channel(host, port):
+ """Creates an insecure Channel to a remote host.
+
+ Args:
+ host: The name of the remote host to which to connect.
+ port: The port of the remote host to which to connect.
+
+ Returns:
+ A Channel to the remote host through which RPCs may be conducted.
+ """
+ return Channel(_low.Channel('%s:%d' % (host, port), ()))
diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py
new file mode 100644
index 0000000000..1b5356e3ad
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/utilities.py
@@ -0,0 +1,161 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Utilities for the gRPC Python Beta API."""
+
+import threading
+import time
+
+from grpc.beta import beta
+from grpc.framework.foundation import callable_util
+from grpc.framework.foundation import future
+
+_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
+ 'Exception calling connectivity future "done" callback!')
+
+
+class _ChannelReadyFuture(future.Future):
+
+ def __init__(self, channel):
+ self._condition = threading.Condition()
+ self._channel = channel
+
+ self._matured = False
+ self._cancelled = False
+ self._done_callbacks = []
+
+ def _block(self, timeout):
+ until = None if timeout is None else time.time() + timeout
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise future.CancelledError()
+ elif self._matured:
+ return
+ else:
+ if until is None:
+ self._condition.wait()
+ else:
+ remaining = until - time.time()
+ if remaining < 0:
+ raise future.TimeoutError()
+ else:
+ self._condition.wait(timeout=remaining)
+
+ def _update(self, connectivity):
+ with self._condition:
+ if not self._cancelled and connectivity is beta.ChannelConnectivity.READY:
+ self._matured = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
+ else:
+ return
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+ def cancel(self):
+ with self._condition:
+ if not self._matured:
+ self._cancelled = True
+ self._channel.unsubscribe(self._update)
+ self._condition.notify_all()
+ done_callbacks = tuple(self._done_callbacks)
+ self._done_callbacks = None
+ else:
+ return False
+
+ for done_callback in done_callbacks:
+ callable_util.call_logging_exceptions(
+ done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self)
+
+ def cancelled(self):
+ with self._condition:
+ return self._cancelled
+
+ def running(self):
+ with self._condition:
+ return not self._cancelled and not self._matured
+
+ def done(self):
+ with self._condition:
+ return self._cancelled or self._matured
+
+ def result(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def exception(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def traceback(self, timeout=None):
+ self._block(timeout)
+ return None
+
+ def add_done_callback(self, fn):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._done_callbacks.append(fn)
+ return
+
+ fn(self)
+
+ def start(self):
+ with self._condition:
+ self._channel.subscribe(self._update, try_to_connect=True)
+
+ def __del__(self):
+ with self._condition:
+ if not self._cancelled and not self._matured:
+ self._channel.unsubscribe(self._update)
+
+
+def channel_ready_future(channel):
+ """Creates a future.Future that matures when a beta.Channel is ready.
+
+ Cancelling the returned future.Future does not tell the given beta.Channel to
+ abandon attempts it may have been making to connect; cancelling merely
+ deactivates the return future.Future's subscription to the given
+ beta.Channel's connectivity.
+
+ Args:
+ channel: A beta.Channel.
+
+ Returns:
+ A future.Future that matures when the given Channel has connectivity
+ beta.ChannelConnectivity.READY.
+ """
+ ready_future = _ChannelReadyFuture(channel)
+ ready_future.start()
+ return ready_future
+
diff --git a/src/python/grpcio_test/grpc_test/beta/__init__.py b/src/python/grpcio_test/grpc_test/beta/__init__.py
new file mode 100644
index 0000000000..7086519106
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/__init__.py
@@ -0,0 +1,30 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
diff --git a/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
new file mode 100644
index 0000000000..038464889d
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_connectivity_channel_test.py
@@ -0,0 +1,180 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc.beta._connectivity_channel."""
+
+import threading
+import time
+import unittest
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import _connectivity_channel
+from grpc_test.framework.common import test_constants
+
+_MAPPING_FUNCTION = lambda integer: integer * 200 + 17
+_MAPPING = {
+ state: _MAPPING_FUNCTION(state) for state in _types.ConnectivityState}
+_IDLE, _CONNECTING, _READY, _TRANSIENT_FAILURE, _FATAL_FAILURE = map(
+ _MAPPING_FUNCTION, _types.ConnectivityState)
+
+
+def _drive_completion_queue(completion_queue):
+ while True:
+ event = completion_queue.next(time.time() + 24 * 60 * 60)
+ if event.type == _types.EventType.QUEUE_SHUTDOWN:
+ break
+
+
+class _Callback(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._connectivities = []
+
+ def update(self, connectivity):
+ with self._condition:
+ self._connectivities.append(connectivity)
+ self._condition.notify()
+
+ def connectivities(self):
+ with self._condition:
+ return tuple(self._connectivities)
+
+ def block_until_connectivities_satisfy(self, predicate):
+ with self._condition:
+ while True:
+ connectivities = tuple(self._connectivities)
+ if predicate(connectivities):
+ return connectivities
+ else:
+ self._condition.wait()
+
+
+class ChannelConnectivityTest(unittest.TestCase):
+
+ def test_lonely_channel_connectivity(self):
+ low_channel = _low.Channel('localhost:12345', ())
+ callback = _Callback()
+
+ connectivity_channel = _connectivity_channel.ConnectivityChannel(
+ low_channel, _MAPPING)
+ connectivity_channel.subscribe(callback.update, try_to_connect=False)
+ first_connectivities = callback.block_until_connectivities_satisfy(bool)
+ connectivity_channel.subscribe(callback.update, try_to_connect=True)
+ second_connectivities = callback.block_until_connectivities_satisfy(
+ lambda connectivities: 2 <= len(connectivities))
+ # Wait for a connection that will never happen.
+ time.sleep(test_constants.SHORT_TIMEOUT)
+ third_connectivities = callback.connectivities()
+ connectivity_channel.unsubscribe(callback.update)
+ fourth_connectivities = callback.connectivities()
+ connectivity_channel.unsubscribe(callback.update)
+ fifth_connectivities = callback.connectivities()
+
+ self.assertSequenceEqual((_IDLE,), first_connectivities)
+ self.assertNotIn(_READY, second_connectivities)
+ self.assertNotIn(_READY, third_connectivities)
+ self.assertNotIn(_READY, fourth_connectivities)
+ self.assertNotIn(_READY, fifth_connectivities)
+
+ def test_immediately_connectable_channel_connectivity(self):
+ server_completion_queue = _low.CompletionQueue()
+ server = _low.Server(server_completion_queue, [])
+ port = server.add_http2_port('[::]:0')
+ server.start()
+ server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue, args=(server_completion_queue,))
+ server_completion_queue_thread.start()
+ low_channel = _low.Channel('localhost:%d' % port, ())
+ first_callback = _Callback()
+ second_callback = _Callback()
+
+ connectivity_channel = _connectivity_channel.ConnectivityChannel(
+ low_channel, _MAPPING)
+ connectivity_channel.subscribe(first_callback.update, try_to_connect=False)
+ first_connectivities = first_callback.block_until_connectivities_satisfy(
+ bool)
+ # Wait for a connection that will never happen because try_to_connect=True
+ # has not yet been passed.
+ time.sleep(test_constants.SHORT_TIMEOUT)
+ second_connectivities = first_callback.connectivities()
+ connectivity_channel.subscribe(second_callback.update, try_to_connect=True)
+ third_connectivities = first_callback.block_until_connectivities_satisfy(
+ lambda connectivities: 2 <= len(connectivities))
+ fourth_connectivities = second_callback.block_until_connectivities_satisfy(
+ bool)
+ # Wait for a connection that will happen (or may already have happened).
+ first_callback.block_until_connectivities_satisfy(
+ lambda connectivities: _READY in connectivities)
+ second_callback.block_until_connectivities_satisfy(
+ lambda connectivities: _READY in connectivities)
+ connectivity_channel.unsubscribe(first_callback.update)
+ connectivity_channel.unsubscribe(second_callback.update)
+
+ server.shutdown()
+ server_completion_queue.shutdown()
+ server_completion_queue_thread.join()
+
+ self.assertSequenceEqual((_IDLE,), first_connectivities)
+ self.assertSequenceEqual((_IDLE,), second_connectivities)
+ self.assertNotIn(_TRANSIENT_FAILURE, third_connectivities)
+ self.assertNotIn(_FATAL_FAILURE, third_connectivities)
+ self.assertNotIn(_TRANSIENT_FAILURE, fourth_connectivities)
+ self.assertNotIn(_FATAL_FAILURE, fourth_connectivities)
+
+ def test_reachable_then_unreachable_channel_connectivity(self):
+ server_completion_queue = _low.CompletionQueue()
+ server = _low.Server(server_completion_queue, [])
+ port = server.add_http2_port('[::]:0')
+ server.start()
+ server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue, args=(server_completion_queue,))
+ server_completion_queue_thread.start()
+ low_channel = _low.Channel('localhost:%d' % port, ())
+ callback = _Callback()
+
+ connectivity_channel = _connectivity_channel.ConnectivityChannel(
+ low_channel, _MAPPING)
+ connectivity_channel.subscribe(callback.update, try_to_connect=True)
+ callback.block_until_connectivities_satisfy(
+ lambda connectivities: _READY in connectivities)
+ # Now take down the server and confirm that channel readiness is repudiated.
+ server.shutdown()
+ callback.block_until_connectivities_satisfy(
+ lambda connectivities: connectivities[-1] is not _READY)
+ connectivity_channel.unsubscribe(callback.update)
+
+ server.shutdown()
+ server_completion_queue.shutdown()
+ server_completion_queue_thread.join()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_test/grpc_test/beta/_utilities_test.py b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
new file mode 100644
index 0000000000..998e74ccf4
--- /dev/null
+++ b/src/python/grpcio_test/grpc_test/beta/_utilities_test.py
@@ -0,0 +1,123 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc.beta.utilities."""
+
+import threading
+import time
+import unittest
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+from grpc.beta import beta
+from grpc.beta import utilities
+from grpc.framework.foundation import future
+from grpc_test.framework.common import test_constants
+
+
+def _drive_completion_queue(completion_queue):
+ while True:
+ event = completion_queue.next(time.time() + 24 * 60 * 60)
+ if event.type == _types.EventType.QUEUE_SHUTDOWN:
+ break
+
+
+class _Callback(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._value = None
+
+ def accept_value(self, value):
+ with self._condition:
+ self._value = value
+ self._condition.notify_all()
+
+ def block_until_called(self):
+ with self._condition:
+ while self._value is None:
+ self._condition.wait()
+ return self._value
+
+
+class ChannelConnectivityTest(unittest.TestCase):
+
+ def test_lonely_channel_connectivity(self):
+ channel = beta.create_insecure_channel('localhost', 12345)
+ callback = _Callback()
+
+ ready_future = utilities.channel_ready_future(channel)
+ ready_future.add_done_callback(callback.accept_value)
+ with self.assertRaises(future.TimeoutError):
+ ready_future.result(test_constants.SHORT_TIMEOUT)
+ self.assertFalse(ready_future.cancelled())
+ self.assertFalse(ready_future.done())
+ self.assertTrue(ready_future.running())
+ ready_future.cancel()
+ value_passed_to_callback = callback.block_until_called()
+ self.assertIs(ready_future, value_passed_to_callback)
+ self.assertTrue(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+
+ def test_immediately_connectable_channel_connectivity(self):
+ server_completion_queue = _low.CompletionQueue()
+ server = _low.Server(server_completion_queue, [])
+ port = server.add_http2_port('[::]:0')
+ server.start()
+ server_completion_queue_thread = threading.Thread(
+ target=_drive_completion_queue, args=(server_completion_queue,))
+ server_completion_queue_thread.start()
+ channel = beta.create_insecure_channel('localhost', port)
+ callback = _Callback()
+
+ try:
+ ready_future = utilities.channel_ready_future(channel)
+ ready_future.add_done_callback(callback.accept_value)
+ self.assertIsNone(
+ ready_future.result(test_constants.SHORT_TIMEOUT))
+ value_passed_to_callback = callback.block_until_called()
+ self.assertIs(ready_future, value_passed_to_callback)
+ self.assertFalse(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+ # Cancellation after maturity has no effect.
+ ready_future.cancel()
+ self.assertFalse(ready_future.cancelled())
+ self.assertTrue(ready_future.done())
+ self.assertFalse(ready_future.running())
+ finally:
+ ready_future.cancel()
+ server.shutdown()
+ server_completion_queue.shutdown()
+ server_completion_queue_thread.join()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)