aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Dan Zhang <danzh@google.com>2018-04-12 11:15:57 -0400
committerGravatar Dan Zhang <danzh@google.com>2018-04-12 11:15:57 -0400
commit8c2314093b33458c2cc1f003524c5f09306fae82 (patch)
treee20cfa39dbb88d428502d4c43612dfb7a665e7a7 /src/core/lib/iomgr
parent074439e7d72837247e1300433fd7fddd97f99b67 (diff)
Allow udp_server to create multiple listeners for each port via SO_REUSEPORT
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc24
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.h3
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc27
-rw-r--r--src/core/lib/iomgr/udp_server.cc153
-rw-r--r--src/core/lib/iomgr/udp_server.h12
5 files changed, 142 insertions, 77 deletions
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index c52e237fa8..8ef1f4f4a2 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -181,6 +181,30 @@ grpc_error* grpc_set_socket_reuse_port(int fd, int reuse) {
#endif
}
+static gpr_once g_probe_so_reuesport_once = GPR_ONCE_INIT;
+static int g_support_so_reuseport = false;
+
+void probe_so_reuseport_once(void) {
+#ifndef GPR_MANYLINUX1
+ int s = socket(AF_INET, SOCK_STREAM, 0);
+ if (s < 0) {
+ /* This might be an ipv6-only environment in which case 'socket(AF_INET,..)'
+ call would fail. Try creating IPv6 socket in that case */
+ s = socket(AF_INET6, SOCK_STREAM, 0);
+ }
+ if (s >= 0) {
+ g_support_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT",
+ grpc_set_socket_reuse_port(s, 1));
+ close(s);
+ }
+#endif
+}
+
+bool grpc_is_socket_reuse_port_supported() {
+ gpr_once_init(&g_probe_so_reuesport_once, probe_so_reuseport_once);
+ return g_support_so_reuseport;
+}
+
/* disable nagle */
grpc_error* grpc_set_socket_low_latency(int fd, int low_latency) {
int val = (low_latency != 0);
diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h
index 1f50e8d315..b3fd58a530 100644
--- a/src/core/lib/iomgr/socket_utils_posix.h
+++ b/src/core/lib/iomgr/socket_utils_posix.h
@@ -44,6 +44,9 @@ grpc_error* grpc_set_socket_cloexec(int fd, int close_on_exec);
/* set a socket to reuse old addresses */
grpc_error* grpc_set_socket_reuse_addr(int fd, int reuse);
+/* return true if SO_REUSEPORT is supported */
+bool grpc_is_socket_reuse_port_supported();
+
/* disable nagle */
grpc_error* grpc_set_socket_low_latency(int fd, int low_latency);
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 4e1d90e86a..f11b82f7ab 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -55,39 +55,18 @@
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
-static gpr_once check_init = GPR_ONCE_INIT;
-static bool has_so_reuseport = false;
-
-static void init(void) {
-#ifndef GPR_MANYLINUX1
- int s = socket(AF_INET, SOCK_STREAM, 0);
- if (s < 0) {
- /* This might be an ipv6-only environment in which case 'socket(AF_INET,..)'
- call would fail. Try creating IPv6 socket in that case */
- s = socket(AF_INET6, SOCK_STREAM, 0);
- }
- if (s >= 0) {
- has_so_reuseport = GRPC_LOG_IF_ERROR("check for SO_REUSEPORT",
- grpc_set_socket_reuse_port(s, 1));
- close(s);
- }
-#endif
-}
-
static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
const grpc_channel_args* args,
grpc_tcp_server** server) {
- gpr_once_init(&check_init, init);
-
grpc_tcp_server* s =
static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
- s->so_reuseport = has_so_reuseport;
+ s->so_reuseport = grpc_is_socket_reuse_port_supported();
s->expand_wildcard_addrs = false;
for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_INTEGER) {
- s->so_reuseport =
- has_so_reuseport && (args->args[i].value.integer != 0);
+ s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
+ (args->args[i].value.integer != 0);
} else {
gpr_free(s);
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 9990deec7a..51d17eb174 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -191,6 +191,9 @@ struct grpc_udp_server {
size_t pollset_count;
/* opaque object to pass to callbacks */
void* user_data;
+
+ /* latch has_so_reuseport during server creation */
+ bool so_reuseport;
};
static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) {
@@ -214,6 +217,7 @@ grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
+ s->so_reuseport = grpc_is_socket_reuse_port_supported();
return s;
}
@@ -353,7 +357,7 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
/* Prepare a recently-created socket for listening. */
static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
const grpc_resolved_address* addr, int rcv_buf_size,
- int snd_buf_size) {
+ int snd_buf_size, bool so_reuseport) {
grpc_resolved_address sockname_temp;
grpc_sockaddr* addr_ptr =
reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr));
@@ -381,21 +385,6 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
}
}
- if (bind_socket(socket_factory, fd, addr) < 0) {
- char* addr_str;
- grpc_sockaddr_to_string(&addr_str, addr, 0);
- gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
- gpr_free(addr_str);
- goto error;
- }
-
- sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
-
- if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
- &sockname_temp.len) < 0) {
- goto error;
- }
-
if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes",
snd_buf_size);
@@ -415,6 +404,30 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
gpr_log(GPR_INFO, "Failed to set socket overflow support");
}
}
+
+ if (so_reuseport && !grpc_is_unix_socket(addr) &&
+ grpc_set_socket_reuse_port(fd, 1) != GRPC_ERROR_NONE) {
+ gpr_log(GPR_ERROR, "Failed to set SO_REUSEPORT for fd %d", fd);
+ goto error;
+ }
+
+ if (bind_socket(socket_factory, fd, addr) < 0) {
+ char* addr_str;
+ grpc_sockaddr_to_string(&addr_str, addr, 0);
+ gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
+ gpr_free(addr_str);
+ goto error;
+ }
+
+ sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
+
+ if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
+ &sockname_temp.len) < 0) {
+ gpr_log(GPR_ERROR, "Unable to get the address socket %d is bound to: %s",
+ fd, strerror(errno));
+ goto error;
+ }
+
return grpc_sockaddr_get_port(&sockname_temp);
error:
@@ -541,8 +554,8 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int rcv_buf_size, int snd_buf_size) {
gpr_log(GPR_DEBUG, "add socket %d to server", fd);
- int port =
- prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size);
+ int port = prepare_socket(s->socket_factory, fd, addr, rcv_buf_size,
+ snd_buf_size, s->so_reuseport);
if (port >= 0) {
gpr_mu_lock(&s->mu);
s->listeners.emplace_back(s, fd, addr);
@@ -557,7 +570,18 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
- GrpcUdpHandlerFactory* handler_factory) {
+ GrpcUdpHandlerFactory* handler_factory,
+ size_t num_listeners) {
+ if (num_listeners > 1 && !s->so_reuseport) {
+ gpr_log(GPR_ERROR,
+ "Try to have multiple listeners on same port, but SO_REUSEPORT is "
+ "not supported. Only create 1 listener.");
+ }
+ char* addr_str;
+ grpc_sockaddr_to_string(&addr_str, addr, 1);
+ gpr_log(GPR_DEBUG, "add address: %s to server", addr_str);
+ gpr_free(addr_str);
+
int allocated_port1 = -1;
int allocated_port2 = -1;
int fd;
@@ -568,11 +592,12 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
grpc_resolved_address addr4_copy;
grpc_resolved_address* allocated_addr = nullptr;
grpc_resolved_address sockname_temp;
- int port;
+ int port = 0;
/* Check if this is a wildcard port, and if so, try to keep the port the same
as some previously created listener. */
if (grpc_sockaddr_get_port(addr) == 0) {
+ /* Loop through existing listeners to find the port in use. */
for (size_t i = 0; i < s->listeners.size(); ++i) {
sockname_temp.len =
static_cast<socklen_t>(sizeof(struct sockaddr_storage));
@@ -581,6 +606,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
&sockname_temp.len)) {
port = grpc_sockaddr_get_port(&sockname_temp);
if (port > 0) {
+ /* Found such a port, update |addr| to reflects this port. */
allocated_addr = static_cast<grpc_resolved_address*>(
gpr_malloc(sizeof(grpc_resolved_address)));
memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
@@ -597,44 +623,73 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
}
s->handler_factory = handler_factory;
- /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
- if (grpc_sockaddr_is_wildcard(addr, &port)) {
- grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
+ for (size_t i = 0; i < num_listeners; ++i) {
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, &port)) {
+ grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
+
+ /* Try listening on IPv6 first. */
+ addr = &wild6;
+ // TODO(rjshade): Test and propagate the returned grpc_error*:
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
+ s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
+ if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
+ if (port == 0) {
+ /* This is the first time to bind to |addr|. If its port is still
+ * wildcard port, update |addr| with the ephermeral port returned by
+ * kernel. Thus |addr| can have a specific port in following
+ * iterations. */
+ grpc_sockaddr_set_port(addr, allocated_port1);
+ port = allocated_port1;
+ } else if (allocated_port1 >= 0) {
+ /* The following sucessfully created socket should have same port as
+ * the first one. */
+ GPR_ASSERT(port == allocated_port1);
+ }
+ /* A dualstack socket is created, no need to create corresponding IPV4
+ * socket. */
+ continue;
+ }
+
+ /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
+ if (port == 0 && allocated_port1 > 0) {
+ /* |port| hasn't been assigned to an emphemeral port yet, |wild4| must
+ * have a wildcard port. Update it with the emphemeral port created
+ * during binding.*/
+ grpc_sockaddr_set_port(&wild4, allocated_port1);
+ port = allocated_port1;
+ }
+ /* |wild4| should have been updated with an emphemeral port by now. Use
+ * this IPV4 address to create a IPV4 socket. */
+ addr = &wild4;
+ }
- /* Try listening on IPv6 first. */
- addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
- if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
- goto done;
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}
-
- /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
- if (port == 0 && allocated_port1 > 0) {
- grpc_sockaddr_set_port(&wild4, allocated_port1);
+ if (dsmode == GRPC_DSMODE_IPV4 &&
+ grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
+ addr = &addr4_copy;
+ }
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
+ if (port == 0) {
+ /* Update |addr| with the ephermeral port returned by kernel. So |addr|
+ * can have a specific port in following iterations. */
+ grpc_sockaddr_set_port(addr, allocated_port2);
+ port = allocated_port2;
+ } else if (allocated_port2 >= 0) {
+ GPR_ASSERT(port == allocated_port2);
}
- addr = &wild4;
- }
-
- // TODO(rjshade): Test and propagate the returned grpc_error*:
- GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
- s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- if (fd < 0) {
- gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
- }
- if (dsmode == GRPC_DSMODE_IPV4 &&
- grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
- addr = &addr4_copy;
}
- allocated_port2 =
- add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size);
-done:
gpr_free(allocated_addr);
- return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
+ return port;
}
int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 4e384d2cdf..3656791c1f 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -86,17 +86,21 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
/* Add a port to the server, returning port number on success, or negative
on failure.
+ Create |num_listeners| sockets for given address to listen on using
+ SO_REUSEPORT if supported.
+
The :: and 0.0.0.0 wildcard addresses are treated identically, accepting
- both IPv4 and IPv6 connections, but :: is the preferred style. This usually
- creates one socket, but possibly two on systems which support IPv6,
- but not dualstack sockets. */
+ both IPv4 and IPv6 connections, but :: is the preferred style. This usually
+ creates |num_listeners| sockets, but possibly 2 * |num_listeners| on systems
+ which support IPv6, but not dualstack sockets. */
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
int rcv_buf_size, int snd_buf_size,
- GrpcUdpHandlerFactory* handler_factory);
+ GrpcUdpHandlerFactory* handler_factory,
+ size_t num_listeners);
void grpc_udp_server_destroy(grpc_udp_server* server, grpc_closure* on_done);