From 0e50b94b5be32e7bd0bde6fab5ae1997ae9f2464 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Wed, 10 Jan 2018 15:45:39 -0500 Subject: Adjust receiv buffer via setsockopt for udp_server's listening socket. Since this socket is used for all incoming traffic, its current buffer 1MB is appearantly too small. Change it to 10 MB for now. --- src/core/lib/iomgr/udp_server.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 4a97f3353d..6dde7b9611 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -283,8 +283,8 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, const grpc_resolved_address* addr) { grpc_resolved_address sockname_temp; struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; - /* Set send/receive socket buffers to 1 MB */ - int buffer_size_bytes = 1024 * 1024; + /* Set send/receive socket buffers to 10 MB */ + int buffer_size_bytes = 1024 * 1024 * 10; if (fd < 0) { goto error; -- cgit v1.2.3 From 9ee9c924d8f23604a8ab78089b1706b5b00e971a Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Wed, 10 Jan 2018 18:04:25 -0500 Subject: change to pass in value --- src/core/lib/iomgr/udp_server.cc | 38 +++++++++++++++++++++++++------------- src/core/lib/iomgr/udp_server.h | 1 + test/core/iomgr/udp_server_test.cc | 23 +++++++++++++++-------- 3 files changed, 41 insertions(+), 21 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 6dde7b9611..8deb0ea544 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -280,11 +280,10 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, /* Prepare a recently-created socket for listening. */ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, - const grpc_resolved_address* addr) { + const grpc_resolved_address* addr, + size_t rcv_buf_size, size_t snd_buf_size) { grpc_resolved_address sockname_temp; struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; - /* Set send/receive socket buffers to 10 MB */ - int buffer_size_bytes = 1024 * 1024 * 10; if (fd < 0) { goto error; @@ -325,18 +324,25 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, goto error; } - if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", - buffer_size_bytes); + if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to set send buffer size to %lu bytes", + snd_buf_size); goto error; } - if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", - buffer_size_bytes); + if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Failed to set receive buffer size to %lu bytes", + rcv_buf_size); goto error; } + { + int get_overflow = 1; + if (0 != setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &get_overflow, + sizeof(get_overflow))) { + gpr_log(GPR_INFO, "Failed to set socket overflow support"); + } + } return grpc_sockaddr_get_port(&sockname_temp); error: @@ -451,6 +457,8 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + size_t rcv_buf_size, + size_t snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, @@ -460,7 +468,8 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, char* addr_str; char* name; - port = prepare_socket(s->socket_factory, fd, addr); + port = + prepare_socket(s->socket_factory, fd, addr, rcv_buf_size, snd_buf_size); if (port >= 0) { grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); @@ -495,6 +504,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + size_t rcv_buf_size, size_t snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, @@ -545,8 +555,9 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, - write_cb, orphan_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, + read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -569,7 +580,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, rcv_buf_size, snd_buf_size, start_cb, + read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index a469ab9be5..ec18716f39 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -68,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + size_t rcv_buf_size, size_t snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index dc1248bc1c..5245840ba9 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -51,6 +51,9 @@ static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; static int g_number_of_starts = 0; +size_t rcv_buf_size = 1024; +size_t snd_buf_size = 1024; + static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } static bool on_read(grpc_fd* emfd) { @@ -177,8 +180,9 @@ static void test_no_op_with_port(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_destroy(s, nullptr); @@ -207,8 +211,9 @@ static void test_no_op_with_port_and_socket_factory(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); GPR_ASSERT(socket_factory->number_of_socket_calls == 1); GPR_ASSERT(socket_factory->number_of_bind_calls == 1); @@ -233,8 +238,9 @@ static void test_no_op_with_port_and_start(void) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_in); addr->sin_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); grpc_udp_server_start(s, nullptr, 0, nullptr); GPR_ASSERT(g_number_of_starts == 1); @@ -265,8 +271,9 @@ static void test_receive(int number_of_clients) { memset(&resolved_addr, 0, sizeof(resolved_addr)); resolved_addr.len = sizeof(struct sockaddr_storage); addr->ss_family = AF_INET; - GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, on_start, on_read, - on_write, on_fd_orphaned)); + GPR_ASSERT(grpc_udp_server_add_port(s, &resolved_addr, rcv_buf_size, + snd_buf_size, on_start, on_read, on_write, + on_fd_orphaned)); svrfd = grpc_udp_server_get_fd(s, 0); GPR_ASSERT(svrfd >= 0); -- cgit v1.2.3 From a6efb9cec7a81b6d2a26d1c5466458b432ebfbd9 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 11 Jan 2018 11:01:30 -0500 Subject: fix portability errors --- src/core/lib/iomgr/udp_server.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 8deb0ea544..2df10ee09a 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -21,6 +21,10 @@ #define _GNU_SOURCE #endif +#ifndef SO_RXQ_OVFL +#define SO_RXQ_OVFL 40 +#endif + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET @@ -325,13 +329,13 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, } if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set send buffer size to %lu bytes", + gpr_log(GPR_ERROR, "Failed to set send buffer size to %zd bytes", snd_buf_size); goto error; } if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set receive buffer size to %lu bytes", + gpr_log(GPR_ERROR, "Failed to set receive buffer size to %zd bytes", rcv_buf_size); goto error; } @@ -457,8 +461,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, - size_t rcv_buf_size, - size_t snd_buf_size, + size_t rcv_buf_size, size_t snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, -- cgit v1.2.3 From 3742b724c9deac411b51dfc7b1ac6818f61550e3 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 11 Jan 2018 13:02:38 -0500 Subject: change to int type --- src/core/lib/iomgr/udp_server.cc | 6 +++--- src/core/lib/iomgr/udp_server.h | 2 +- test/core/iomgr/udp_server_test.cc | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 2df10ee09a..a9a3f3aba5 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -285,7 +285,7 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, /* Prepare a recently-created socket for listening. */ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, const grpc_resolved_address* addr, - size_t rcv_buf_size, size_t snd_buf_size) { + int rcv_buf_size, int snd_buf_size) { grpc_resolved_address sockname_temp; struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; @@ -461,7 +461,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, - size_t rcv_buf_size, size_t snd_buf_size, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, @@ -507,7 +507,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, - size_t rcv_buf_size, size_t snd_buf_size, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index ec18716f39..c1aa49f15d 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -68,7 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, - size_t rcv_buf_size, size_t snd_buf_size, + int rcv_buf_size, int snd_buf_size, grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, diff --git a/test/core/iomgr/udp_server_test.cc b/test/core/iomgr/udp_server_test.cc index 5245840ba9..09f0283013 100644 --- a/test/core/iomgr/udp_server_test.cc +++ b/test/core/iomgr/udp_server_test.cc @@ -51,8 +51,8 @@ static int g_number_of_bytes_read = 0; static int g_number_of_orphan_calls = 0; static int g_number_of_starts = 0; -size_t rcv_buf_size = 1024; -size_t snd_buf_size = 1024; +int rcv_buf_size = 1024; +int snd_buf_size = 1024; static void on_start(grpc_fd* emfd, void* user_data) { g_number_of_starts++; } -- cgit v1.2.3 From e2a6ca835c5163075a11f0c1e78fb98378298eb7 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 11 Jan 2018 13:06:38 -0500 Subject: %zd->%d --- src/core/lib/iomgr/udp_server.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index a9a3f3aba5..ec3b88d911 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -329,13 +329,13 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, } if (grpc_set_socket_sndbuf(fd, snd_buf_size) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set send buffer size to %zd bytes", + gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", snd_buf_size); goto error; } if (grpc_set_socket_rcvbuf(fd, rcv_buf_size) != GRPC_ERROR_NONE) { - gpr_log(GPR_ERROR, "Failed to set receive buffer size to %zd bytes", + gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", rcv_buf_size); goto error; } -- cgit v1.2.3 From a06720bab85239e34bc42caefd6191284061a311 Mon Sep 17 00:00:00 2001 From: Dan Zhang Date: Thu, 11 Jan 2018 13:20:57 -0500 Subject: format --- src/core/lib/iomgr/udp_server.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index ec3b88d911..946846a8b8 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -284,8 +284,8 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, /* Prepare a recently-created socket for listening. */ static int prepare_socket(grpc_socket_factory* socket_factory, int fd, - const grpc_resolved_address* addr, - int rcv_buf_size, int snd_buf_size) { + const grpc_resolved_address* addr, int rcv_buf_size, + int snd_buf_size) { grpc_resolved_address sockname_temp; struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; -- cgit v1.2.3