From 128d3ff3614a76ac146891cf81b4d8df5c9ea064 Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Tue, 2 Aug 2016 15:23:20 -0400 Subject: Change udp_server to use linked list of ports --- src/core/lib/iomgr/udp_server.c | 90 ++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 38 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 48032412a2..90fae6b94f 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -67,10 +67,9 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/support/string.h" -#define INIT_PORT_CAP 2 - /* one listening port */ -typedef struct { +typedef struct grpc_udp_listener grpc_udp_listener; +struct grpc_udp_listener { int fd; grpc_fd *emfd; grpc_udp_server *server; @@ -83,7 +82,9 @@ typedef struct { grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; grpc_udp_server_orphan_cb orphan_cb; -} server_port; + + struct grpc_udp_listener *next; +}; /* the overall server */ struct grpc_udp_server { @@ -98,10 +99,10 @@ struct grpc_udp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* all listening ports */ - server_port *ports; - size_t nports; - size_t port_capacity; + /* linked list of server ports */ + grpc_udp_listener *head; + grpc_udp_listener *tail; + unsigned nports; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -121,9 +122,9 @@ grpc_udp_server *grpc_udp_server_create(void) { s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->head = NULL; + s->tail = NULL; s->nports = 0; - s->port_capacity = INIT_PORT_CAP; return s; } @@ -131,10 +132,16 @@ grpc_udp_server *grpc_udp_server_create(void) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); - gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); + gpr_mu_destroy(&s->mu); + + while (s->head) { + grpc_udp_listener *sp = s->head; + s->head = sp->next; + + gpr_free(sp); + } - gpr_free(s->ports); gpr_free(s); } @@ -165,9 +172,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { return; } - if (s->nports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + if (s->head) { + grpc_udp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; @@ -187,6 +194,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { size_t i; + grpc_udp_listener* sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -196,8 +204,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, /* shutdown all fd's */ if (s->active_ports) { - for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); + for (sp = s->head; sp; sp = sp->next) { + GPR_ASSERT(sp->orphan_cb); + sp->orphan_cb(sp->emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd); } gpr_mu_unlock(&s->mu); } else { @@ -274,10 +284,9 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - server_port *sp = arg; + grpc_udp_listener *sp = arg; if (error != GRPC_ERROR_NONE) { - gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); @@ -299,7 +308,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, const struct sockaddr *addr, size_t addr_len, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { - server_port *sp; + grpc_udp_listener *sp; int port; char *addr_str; char *name; @@ -310,12 +319,15 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); gpr_mu_lock(&s->mu); - /* append it to the list under a lock */ - if (s->nports == s->port_capacity) { - s->port_capacity *= 2; - s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + s->nports++; + sp = gpr_malloc(sizeof(grpc_udp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; } - sp = &s->ports[s->nports++]; + s->tail = sp; sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); @@ -334,6 +346,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, size_t addr_len, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { + grpc_udp_listener* sp; int allocated_port1 = -1; int allocated_port2 = -1; unsigned i; @@ -351,9 +364,9 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (i = 0; i < s->nports; i++) { + for (sp = s->head; sp; sp = sp->next) { sockname_len = sizeof(sockname_temp); - if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, + if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp, &sockname_len)) { port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); if (port > 0) { @@ -413,28 +426,29 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { - return (port_index < s->nports) ? s->ports[port_index].fd : -1; -} - void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_server *server) { size_t i, j; gpr_mu_lock(&s->mu); + grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->grpc_server = server; - for (i = 0; i < s->nports; i++) { - for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); + + sp = s->head; + while (sp != NULL) { + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - s->ports[i].read_closure.cb = on_read; - s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, - &s->ports[i].read_closure); + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + s->active_ports++; + sp = sp->next; } + gpr_mu_unlock(&s->mu); } -- cgit v1.2.3 From 956f1d31690e816cff311e01dac9278cde507785 Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Tue, 2 Aug 2016 16:55:00 -0400 Subject: Refactor udp_server to use a linked list of ports --- src/core/lib/iomgr/udp_server.c | 30 ++++++++++++++++++++---------- src/core/lib/iomgr/udp_server.h | 2 +- test/core/iomgr/udp_server_test.c | 4 ++-- 3 files changed, 23 insertions(+), 13 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 90fae6b94f..bdf26d4097 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -89,7 +89,6 @@ struct grpc_udp_listener { /* the overall server */ struct grpc_udp_server { gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -118,7 +117,6 @@ struct grpc_udp_server { grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; @@ -130,15 +128,15 @@ grpc_udp_server *grpc_udp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } - gpr_cv_destroy(&s->cv); gpr_mu_destroy(&s->mu); while (s->head) { grpc_udp_listener *sp = s->head; s->head = sp->next; - gpr_free(sp); } @@ -162,8 +160,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - size_t i; - /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -175,6 +171,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->head) { grpc_udp_listener *sp; for (sp = s->head; sp; sp = sp->next) { + // grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); + sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; @@ -193,7 +191,6 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { - size_t i; grpc_udp_listener* sp; gpr_mu_lock(&s->mu); @@ -286,6 +283,7 @@ error: static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_udp_listener *sp = arg; + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); @@ -302,6 +300,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + gpr_mu_unlock(&sp->server->mu); } static int add_socket_to_server(grpc_udp_server *s, int fd, @@ -349,7 +348,6 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, grpc_udp_listener* sp; int allocated_port1 = -1; int allocated_port2 = -1; - unsigned i; int fd; grpc_dualstack_mode dsmode; struct sockaddr_in6 addr6_v4mapped; @@ -426,10 +424,22 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { + grpc_udp_listener *sp; + if (port_index >= s->nports) { + return -1; + } + + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + --port_index; + } + return sp->fd; +} + void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_server *server) { - size_t i, j; + size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 33c5ce11cd..70d0f19454 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -59,7 +59,7 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, struct grpc_server *server); -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); /* Add a port to the server, returning port number on success, or negative on failure. diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index a959a7e07f..6667581ad1 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -134,7 +134,7 @@ static void test_no_op_with_port_and_start(void) { grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == 2); } static void test_receive(int number_of_clients) { @@ -199,7 +199,7 @@ static void test_receive(int number_of_clients) { grpc_exec_ctx_finish(&exec_ctx); /* The server had a single FD, which should have been orphaned. */ - GPR_ASSERT(g_number_of_orphan_calls == 1); + GPR_ASSERT(g_number_of_orphan_calls == 2); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, -- cgit v1.2.3 From fff290d88b1d6f45aaf48f3f17df1f97bb8a0890 Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Wed, 12 Oct 2016 11:49:44 -0400 Subject: Uncomment accidentally commented line --- src/core/lib/iomgr/udp_server.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index e84a13b338..6216715a79 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -170,7 +170,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->head) { grpc_udp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - // grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); + grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; -- cgit v1.2.3 From cbd1b9758779110ecf8057611f652fbf4d519101 Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Wed, 12 Oct 2016 11:54:10 -0400 Subject: Add needed include --- src/core/lib/iomgr/udp_server.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 6216715a79..9c842db04b 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -64,6 +64,7 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" /* one listening port */ -- cgit v1.2.3 From 7433e5d12c3d9697ef2bf401fe52e7c7af7c5ca8 Mon Sep 17 00:00:00 2001 From: Robbie Shade Date: Thu, 20 Oct 2016 16:23:16 -0400 Subject: clang format --- src/core/lib/iomgr/udp_server.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr') diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 9c842db04b..20eae7cbdd 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -193,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { - grpc_udp_listener* sp; + grpc_udp_listener *sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -347,7 +347,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, size_t addr_len, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener* sp; + grpc_udp_listener *sp; int allocated_port1 = -1; int allocated_port2 = -1; int fd; -- cgit v1.2.3