aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/_reconnect_test.py
blob: d4ea126e2b5a578c69cc26309aba146117c47d0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests that a channel will reconnect if a connection is dropped"""

import socket
import time
import logging
import unittest

import grpc
from grpc.framework.foundation import logging_pool

from tests.unit.framework.common import test_constants

_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x01'

_UNARY_UNARY = '/test/UnaryUnary'


def _handle_unary_unary(unused_request, unused_servicer_context):
    return _RESPONSE


def _get_reuse_socket_option():
    try:
        return socket.SO_REUSEPORT
    except AttributeError:
        # SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR
        # allows forcibly re-binding to a port
        return socket.SO_REUSEADDR


def _pick_and_bind_port(sock_opt):
    # Reserve a port, when we restart the server we want
    # to hold onto the port
    port = 0
    for address_family in (socket.AF_INET6, socket.AF_INET):
        try:
            s = socket.socket(address_family, socket.SOCK_STREAM)
        except socket.error:
            continue  # this address family is unavailable
        s.setsockopt(socket.SOL_SOCKET, sock_opt, 1)
        try:
            s.bind(('localhost', port))
            # for socket.SOCK_STREAM sockets, it is necessary to call
            # listen to get the desired behavior.
            s.listen(1)
            port = s.getsockname()[1]
        except socket.error:
            # port was not available on the current address family
            # try again
            port = 0
            break
        finally:
            s.close()
    if s:
        return port if port != 0 else _pick_and_bind_port(sock_opt)
    else:
        return None  # no address family was available


class ReconnectTest(unittest.TestCase):

    def test_reconnect(self):
        server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
        handler = grpc.method_handlers_generic_handler('test', {
            'UnaryUnary':
            grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
        })
        sock_opt = _get_reuse_socket_option()
        port = _pick_and_bind_port(sock_opt)
        self.assertIsNotNone(port)

        server = grpc.server(server_pool, (handler,))
        server.add_insecure_port('[::]:{}'.format(port))
        server.start()
        channel = grpc.insecure_channel('localhost:%d' % port)
        multi_callable = channel.unary_unary(_UNARY_UNARY)
        self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
        server.stop(None)
        # By default, the channel connectivity is checked every 5s
        # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change
        # this.
        time.sleep(5.1)
        server = grpc.server(server_pool, (handler,))
        server.add_insecure_port('[::]:{}'.format(port))
        server.start()
        self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
        server.stop(None)
        channel.close()


if __name__ == '__main__':
    logging.basicConfig()
    unittest.main(verbosity=2)