aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py')
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py209
1 files changed, 103 insertions, 106 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
index 3d9dd17ff6..d67693154b 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tests of grpc._channel.Channel connectivity."""
import threading
@@ -39,125 +38,123 @@ from tests.unit import _thread_pool
def _ready_in_connectivities(connectivities):
- return grpc.ChannelConnectivity.READY in connectivities
+ return grpc.ChannelConnectivity.READY in connectivities
def _last_connectivity_is_not_ready(connectivities):
- return connectivities[-1] is not grpc.ChannelConnectivity.READY
+ return connectivities[-1] is not grpc.ChannelConnectivity.READY
class _Callback(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._connectivities = []
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._connectivities = []
- def update(self, connectivity):
- with self._condition:
- self._connectivities.append(connectivity)
- self._condition.notify()
+ def update(self, connectivity):
+ with self._condition:
+ self._connectivities.append(connectivity)
+ self._condition.notify()
- def connectivities(self):
- with self._condition:
- return tuple(self._connectivities)
+ def connectivities(self):
+ with self._condition:
+ return tuple(self._connectivities)
- def block_until_connectivities_satisfy(self, predicate):
- with self._condition:
- while True:
- connectivities = tuple(self._connectivities)
- if predicate(connectivities):
- return connectivities
- else:
- self._condition.wait()
+ def block_until_connectivities_satisfy(self, predicate):
+ with self._condition:
+ while True:
+ connectivities = tuple(self._connectivities)
+ if predicate(connectivities):
+ return connectivities
+ else:
+ self._condition.wait()
class ChannelConnectivityTest(unittest.TestCase):
- def test_lonely_channel_connectivity(self):
- callback = _Callback()
-
- channel = grpc.insecure_channel('localhost:12345')
- channel.subscribe(callback.update, try_to_connect=False)
- first_connectivities = callback.block_until_connectivities_satisfy(bool)
- channel.subscribe(callback.update, try_to_connect=True)
- second_connectivities = callback.block_until_connectivities_satisfy(
- lambda connectivities: 2 <= len(connectivities))
- # Wait for a connection that will never happen.
- time.sleep(test_constants.SHORT_TIMEOUT)
- third_connectivities = callback.connectivities()
- channel.unsubscribe(callback.update)
- fourth_connectivities = callback.connectivities()
- channel.unsubscribe(callback.update)
- fifth_connectivities = callback.connectivities()
-
- self.assertSequenceEqual(
- (grpc.ChannelConnectivity.IDLE,), first_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.READY, second_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.READY, third_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.READY, fourth_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.READY, fifth_connectivities)
-
- def test_immediately_connectable_channel_connectivity(self):
- thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = grpc.server(thread_pool)
- port = server.add_insecure_port('[::]:0')
- server.start()
- first_callback = _Callback()
- second_callback = _Callback()
-
- channel = grpc.insecure_channel('localhost:{}'.format(port))
- channel.subscribe(first_callback.update, try_to_connect=False)
- first_connectivities = first_callback.block_until_connectivities_satisfy(
- bool)
- # Wait for a connection that will never happen because try_to_connect=True
- # has not yet been passed.
- time.sleep(test_constants.SHORT_TIMEOUT)
- second_connectivities = first_callback.connectivities()
- channel.subscribe(second_callback.update, try_to_connect=True)
- third_connectivities = first_callback.block_until_connectivities_satisfy(
- lambda connectivities: 2 <= len(connectivities))
- fourth_connectivities = second_callback.block_until_connectivities_satisfy(
- bool)
- # Wait for a connection that will happen (or may already have happened).
- first_callback.block_until_connectivities_satisfy(_ready_in_connectivities)
- second_callback.block_until_connectivities_satisfy(_ready_in_connectivities)
- del channel
-
- self.assertSequenceEqual(
- (grpc.ChannelConnectivity.IDLE,), first_connectivities)
- self.assertSequenceEqual(
- (grpc.ChannelConnectivity.IDLE,), second_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.SHUTDOWN, third_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.TRANSIENT_FAILURE,
- fourth_connectivities)
- self.assertNotIn(
- grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities)
- self.assertFalse(thread_pool.was_used())
-
- def test_reachable_then_unreachable_channel_connectivity(self):
- thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = grpc.server(thread_pool)
- port = server.add_insecure_port('[::]:0')
- server.start()
- callback = _Callback()
-
- channel = grpc.insecure_channel('localhost:{}'.format(port))
- channel.subscribe(callback.update, try_to_connect=True)
- callback.block_until_connectivities_satisfy(_ready_in_connectivities)
- # Now take down the server and confirm that channel readiness is repudiated.
- server.stop(None)
- callback.block_until_connectivities_satisfy(_last_connectivity_is_not_ready)
- channel.unsubscribe(callback.update)
- self.assertFalse(thread_pool.was_used())
+ def test_lonely_channel_connectivity(self):
+ callback = _Callback()
+
+ channel = grpc.insecure_channel('localhost:12345')
+ channel.subscribe(callback.update, try_to_connect=False)
+ first_connectivities = callback.block_until_connectivities_satisfy(bool)
+ channel.subscribe(callback.update, try_to_connect=True)
+ second_connectivities = callback.block_until_connectivities_satisfy(
+ lambda connectivities: 2 <= len(connectivities))
+ # Wait for a connection that will never happen.
+ time.sleep(test_constants.SHORT_TIMEOUT)
+ third_connectivities = callback.connectivities()
+ channel.unsubscribe(callback.update)
+ fourth_connectivities = callback.connectivities()
+ channel.unsubscribe(callback.update)
+ fifth_connectivities = callback.connectivities()
+
+ self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
+ first_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities)
+
+ def test_immediately_connectable_channel_connectivity(self):
+ thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
+ server = grpc.server(thread_pool)
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ first_callback = _Callback()
+ second_callback = _Callback()
+
+ channel = grpc.insecure_channel('localhost:{}'.format(port))
+ channel.subscribe(first_callback.update, try_to_connect=False)
+ first_connectivities = first_callback.block_until_connectivities_satisfy(
+ bool)
+ # Wait for a connection that will never happen because try_to_connect=True
+ # has not yet been passed.
+ time.sleep(test_constants.SHORT_TIMEOUT)
+ second_connectivities = first_callback.connectivities()
+ channel.subscribe(second_callback.update, try_to_connect=True)
+ third_connectivities = first_callback.block_until_connectivities_satisfy(
+ lambda connectivities: 2 <= len(connectivities))
+ fourth_connectivities = second_callback.block_until_connectivities_satisfy(
+ bool)
+ # Wait for a connection that will happen (or may already have happened).
+ first_callback.block_until_connectivities_satisfy(
+ _ready_in_connectivities)
+ second_callback.block_until_connectivities_satisfy(
+ _ready_in_connectivities)
+ del channel
+
+ self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
+ first_connectivities)
+ self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
+ second_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
+ third_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
+ third_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
+ fourth_connectivities)
+ self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
+ fourth_connectivities)
+ self.assertFalse(thread_pool.was_used())
+
+ def test_reachable_then_unreachable_channel_connectivity(self):
+ thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
+ server = grpc.server(thread_pool)
+ port = server.add_insecure_port('[::]:0')
+ server.start()
+ callback = _Callback()
+
+ channel = grpc.insecure_channel('localhost:{}'.format(port))
+ channel.subscribe(callback.update, try_to_connect=True)
+ callback.block_until_connectivities_satisfy(_ready_in_connectivities)
+ # Now take down the server and confirm that channel readiness is repudiated.
+ server.stop(None)
+ callback.block_until_connectivities_satisfy(
+ _last_connectivity_is_not_ready)
+ channel.unsubscribe(callback.update)
+ self.assertFalse(thread_pool.was_used())
if __name__ == '__main__':
- unittest.main(verbosity=2)
+ unittest.main(verbosity=2)