1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests that a channel will reconnect if a connection is dropped"""
import socket
import time
import logging
import unittest
import grpc
from grpc.framework.foundation import logging_pool
from tests.unit.framework.common import test_constants
_REQUEST = b'\x00\x00\x00'
_RESPONSE = b'\x00\x00\x01'
_UNARY_UNARY = '/test/UnaryUnary'
def _handle_unary_unary(unused_request, unused_servicer_context):
return _RESPONSE
def _get_reuse_socket_option():
try:
return socket.SO_REUSEPORT
except AttributeError:
# SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR
# allows forcibly re-binding to a port
return socket.SO_REUSEADDR
def _pick_and_bind_port(sock_opt):
# Reserve a port, when we restart the server we want
# to hold onto the port
port = 0
for address_family in (socket.AF_INET6, socket.AF_INET):
try:
s = socket.socket(address_family, socket.SOCK_STREAM)
except socket.error:
continue # this address family is unavailable
s.setsockopt(socket.SOL_SOCKET, sock_opt, 1)
try:
s.bind(('localhost', port))
# for socket.SOCK_STREAM sockets, it is necessary to call
# listen to get the desired behavior.
s.listen(1)
port = s.getsockname()[1]
except socket.error:
# port was not available on the current address family
# try again
port = 0
break
finally:
s.close()
if s:
return port if port != 0 else _pick_and_bind_port(sock_opt)
else:
return None # no address family was available
class ReconnectTest(unittest.TestCase):
def test_reconnect(self):
server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
})
sock_opt = _get_reuse_socket_option()
port = _pick_and_bind_port(sock_opt)
self.assertIsNotNone(port)
server = grpc.server(server_pool, (handler,))
server.add_insecure_port('[::]:{}'.format(port))
server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
multi_callable = channel.unary_unary(_UNARY_UNARY)
self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
server.stop(None)
# By default, the channel connectivity is checked every 5s
# GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change
# this.
time.sleep(5.1)
server = grpc.server(server_pool, (handler,))
server.add_insecure_port('[::]:{}'.format(port))
server.start()
self.assertEqual(_RESPONSE, multi_callable(_REQUEST))
server.stop(None)
channel.close()
if __name__ == '__main__':
logging.basicConfig()
unittest.main(verbosity=2)
|