diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-04-07 14:32:15 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-04-07 16:20:48 -0700 |
commit | aec96aa2232b3ab7f86bc08a99f82684406edf0b (patch) | |
tree | bfba274bab64544dafb29256788138be5039f965 /src/core/iomgr | |
parent | 106882854b40775ac570fbf307ad978e808d8e62 (diff) |
Fix server shutdown
A previous fix to make close() occur later can cause socket reuse by servers to fail as previous sockets are left asynchronously open.
This change:
- adds a callback to TCP server shutdown to signal that the server is completely shutdown
- wait for that callback before destroying listeners in the server (and before destroying the server)
- handles fallout
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/tcp_server.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 58 |
2 files changed, 52 insertions, 10 deletions
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 68ee85c5a7..1e58901a7a 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -71,6 +71,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, up when grpc_tcp_server_destroy is called. */ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); -void grpc_tcp_server_destroy(grpc_tcp_server *server); +void grpc_tcp_server_destroy(grpc_tcp_server *server, + void (*shutdown_done)(void *shutdown_done_arg), + void *shutdown_done_arg); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 90b7eb451d..482166e2eb 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -102,12 +102,18 @@ struct grpc_tcp_server { gpr_cv cv; /* active port count: how many ports are actually still listening */ - int active_ports; + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; /* all listening ports */ server_port *ports; size_t nports; size_t port_capacity; + + /* shutdown callback */ + void (*shutdown_complete)(void *); + void *shutdown_complete_arg; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -115,6 +121,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { gpr_mu_init(&s->mu); gpr_cv_init(&s->cv); s->active_ports = 0; + s->destroyed_ports = 0; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); @@ -123,29 +130,62 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } -void grpc_tcp_server_destroy(grpc_tcp_server *s) { +static void finish_shutdown(grpc_tcp_server *s) { + s->shutdown_complete(s->shutdown_complete_arg); + + gpr_mu_destroy(&s->mu); + gpr_cv_destroy(&s->cv); + + gpr_free(s->ports); + gpr_free(s); +} + +static void destroyed_port(void *server, int success) { + grpc_tcp_server *s = server; + gpr_mu_lock(&s->mu); + s->destroyed_ports++; + if (s->destroyed_ports == s->nports) { + gpr_mu_unlock(&s->mu); + finish_shutdown(s); + } else { + gpr_mu_unlock(&s->mu); + } +} + +static void dont_care_about_shutdown_completion(void *ignored) {} + +void grpc_tcp_server_destroy(grpc_tcp_server *s, + void (*shutdown_complete)(void *shutdown_complete_arg), + void *shutdown_complete_arg) { size_t i; gpr_mu_lock(&s->mu); + + s->shutdown_complete = shutdown_complete ? shutdown_complete : dont_care_about_shutdown_completion; + s->shutdown_complete_arg = shutdown_complete_arg; + /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { grpc_fd_shutdown(s->ports[i].emfd); } /* wait while that happens */ + /* TODO(ctiller): make this asynchronous also */ while (s->active_ports) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future); } gpr_mu_unlock(&s->mu); /* delete ALL the things */ - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - if (sp->addr.sockaddr.sa_family == AF_UNIX) { - unlink_if_unix_domain_socket(&sp->addr.un); + if (s->nports) { + for (i = 0; i < s->nports; i++) { + server_port *sp = &s->ports[i]; + if (sp->addr.sockaddr.sa_family == AF_UNIX) { + unlink_if_unix_domain_socket(&sp->addr.un); + } + grpc_fd_orphan(sp->emfd, destroyed_port, s); } - grpc_fd_orphan(sp->emfd, NULL, NULL); + } else { + finish_shutdown(s); } - gpr_free(s->ports); - gpr_free(s); } /* get max listen queue size on linux */ |