diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py')
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py | 209 |
1 files changed, 103 insertions, 106 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index 3d9dd17ff6..d67693154b 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tests of grpc._channel.Channel connectivity.""" import threading @@ -39,125 +38,123 @@ from tests.unit import _thread_pool def _ready_in_connectivities(connectivities): - return grpc.ChannelConnectivity.READY in connectivities + return grpc.ChannelConnectivity.READY in connectivities def _last_connectivity_is_not_ready(connectivities): - return connectivities[-1] is not grpc.ChannelConnectivity.READY + return connectivities[-1] is not grpc.ChannelConnectivity.READY class _Callback(object): - def __init__(self): - self._condition = threading.Condition() - self._connectivities = [] + def __init__(self): + self._condition = threading.Condition() + self._connectivities = [] - def update(self, connectivity): - with self._condition: - self._connectivities.append(connectivity) - self._condition.notify() + def update(self, connectivity): + with self._condition: + self._connectivities.append(connectivity) + self._condition.notify() - def connectivities(self): - with self._condition: - return tuple(self._connectivities) + def connectivities(self): + with self._condition: + return tuple(self._connectivities) - def block_until_connectivities_satisfy(self, predicate): - with self._condition: - while True: - connectivities = tuple(self._connectivities) - if predicate(connectivities): - return connectivities - else: - self._condition.wait() + def block_until_connectivities_satisfy(self, predicate): + with self._condition: + while True: + connectivities = tuple(self._connectivities) + if predicate(connectivities): + return connectivities + else: + self._condition.wait() class ChannelConnectivityTest(unittest.TestCase): - def test_lonely_channel_connectivity(self): - callback = _Callback() - - channel = grpc.insecure_channel('localhost:12345') - channel.subscribe(callback.update, try_to_connect=False) - first_connectivities = callback.block_until_connectivities_satisfy(bool) - channel.subscribe(callback.update, try_to_connect=True) - second_connectivities = callback.block_until_connectivities_satisfy( - lambda connectivities: 2 <= len(connectivities)) - # Wait for a connection that will never happen. - time.sleep(test_constants.SHORT_TIMEOUT) - third_connectivities = callback.connectivities() - channel.unsubscribe(callback.update) - fourth_connectivities = callback.connectivities() - channel.unsubscribe(callback.update) - fifth_connectivities = callback.connectivities() - - self.assertSequenceEqual( - (grpc.ChannelConnectivity.IDLE,), first_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.READY, second_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.READY, third_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.READY, fourth_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.READY, fifth_connectivities) - - def test_immediately_connectable_channel_connectivity(self): - thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) - port = server.add_insecure_port('[::]:0') - server.start() - first_callback = _Callback() - second_callback = _Callback() - - channel = grpc.insecure_channel('localhost:{}'.format(port)) - channel.subscribe(first_callback.update, try_to_connect=False) - first_connectivities = first_callback.block_until_connectivities_satisfy( - bool) - # Wait for a connection that will never happen because try_to_connect=True - # has not yet been passed. - time.sleep(test_constants.SHORT_TIMEOUT) - second_connectivities = first_callback.connectivities() - channel.subscribe(second_callback.update, try_to_connect=True) - third_connectivities = first_callback.block_until_connectivities_satisfy( - lambda connectivities: 2 <= len(connectivities)) - fourth_connectivities = second_callback.block_until_connectivities_satisfy( - bool) - # Wait for a connection that will happen (or may already have happened). - first_callback.block_until_connectivities_satisfy(_ready_in_connectivities) - second_callback.block_until_connectivities_satisfy(_ready_in_connectivities) - del channel - - self.assertSequenceEqual( - (grpc.ChannelConnectivity.IDLE,), first_connectivities) - self.assertSequenceEqual( - (grpc.ChannelConnectivity.IDLE,), second_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.SHUTDOWN, third_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.TRANSIENT_FAILURE, - fourth_connectivities) - self.assertNotIn( - grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities) - self.assertFalse(thread_pool.was_used()) - - def test_reachable_then_unreachable_channel_connectivity(self): - thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) - port = server.add_insecure_port('[::]:0') - server.start() - callback = _Callback() - - channel = grpc.insecure_channel('localhost:{}'.format(port)) - channel.subscribe(callback.update, try_to_connect=True) - callback.block_until_connectivities_satisfy(_ready_in_connectivities) - # Now take down the server and confirm that channel readiness is repudiated. - server.stop(None) - callback.block_until_connectivities_satisfy(_last_connectivity_is_not_ready) - channel.unsubscribe(callback.update) - self.assertFalse(thread_pool.was_used()) + def test_lonely_channel_connectivity(self): + callback = _Callback() + + channel = grpc.insecure_channel('localhost:12345') + channel.subscribe(callback.update, try_to_connect=False) + first_connectivities = callback.block_until_connectivities_satisfy(bool) + channel.subscribe(callback.update, try_to_connect=True) + second_connectivities = callback.block_until_connectivities_satisfy( + lambda connectivities: 2 <= len(connectivities)) + # Wait for a connection that will never happen. + time.sleep(test_constants.SHORT_TIMEOUT) + third_connectivities = callback.connectivities() + channel.unsubscribe(callback.update) + fourth_connectivities = callback.connectivities() + channel.unsubscribe(callback.update) + fifth_connectivities = callback.connectivities() + + self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), + first_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities) + + def test_immediately_connectable_channel_connectivity(self): + thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) + server = grpc.server(thread_pool) + port = server.add_insecure_port('[::]:0') + server.start() + first_callback = _Callback() + second_callback = _Callback() + + channel = grpc.insecure_channel('localhost:{}'.format(port)) + channel.subscribe(first_callback.update, try_to_connect=False) + first_connectivities = first_callback.block_until_connectivities_satisfy( + bool) + # Wait for a connection that will never happen because try_to_connect=True + # has not yet been passed. + time.sleep(test_constants.SHORT_TIMEOUT) + second_connectivities = first_callback.connectivities() + channel.subscribe(second_callback.update, try_to_connect=True) + third_connectivities = first_callback.block_until_connectivities_satisfy( + lambda connectivities: 2 <= len(connectivities)) + fourth_connectivities = second_callback.block_until_connectivities_satisfy( + bool) + # Wait for a connection that will happen (or may already have happened). + first_callback.block_until_connectivities_satisfy( + _ready_in_connectivities) + second_callback.block_until_connectivities_satisfy( + _ready_in_connectivities) + del channel + + self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), + first_connectivities) + self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), + second_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE, + third_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN, + third_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE, + fourth_connectivities) + self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN, + fourth_connectivities) + self.assertFalse(thread_pool.was_used()) + + def test_reachable_then_unreachable_channel_connectivity(self): + thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) + server = grpc.server(thread_pool) + port = server.add_insecure_port('[::]:0') + server.start() + callback = _Callback() + + channel = grpc.insecure_channel('localhost:{}'.format(port)) + channel.subscribe(callback.update, try_to_connect=True) + callback.block_until_connectivities_satisfy(_ready_in_connectivities) + # Now take down the server and confirm that channel readiness is repudiated. + server.stop(None) + callback.block_until_connectivities_satisfy( + _last_connectivity_is_not_ready) + channel.unsubscribe(callback.update) + self.assertFalse(thread_pool.was_used()) if __name__ == '__main__': - unittest.main(verbosity=2) + unittest.main(verbosity=2) |