aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-04-07 14:32:15 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-04-07 16:20:48 -0700
commitaec96aa2232b3ab7f86bc08a99f82684406edf0b (patch)
treebfba274bab64544dafb29256788138be5039f965 /src/core/iomgr
parent106882854b40775ac570fbf307ad978e808d8e62 (diff)
Fix server shutdown
A previous fix to make close() occur later can cause socket reuse by servers to fail as previous sockets are left asynchronously open. This change: - adds a callback to TCP server shutdown to signal that the server is completely shutdown - wait for that callback before destroying listeners in the server (and before destroying the server) - handles fallout
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/tcp_server.h4
-rw-r--r--src/core/iomgr/tcp_server_posix.c58
2 files changed, 52 insertions, 10 deletions
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 68ee85c5a7..1e58901a7a 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -71,6 +71,8 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr,
up when grpc_tcp_server_destroy is called. */
int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index);
-void grpc_tcp_server_destroy(grpc_tcp_server *server);
+void grpc_tcp_server_destroy(grpc_tcp_server *server,
+ void (*shutdown_done)(void *shutdown_done_arg),
+ void *shutdown_done_arg);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 90b7eb451d..482166e2eb 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -102,12 +102,18 @@ struct grpc_tcp_server {
gpr_cv cv;
/* active port count: how many ports are actually still listening */
- int active_ports;
+ size_t active_ports;
+ /* destroyed port count: how many ports are completely destroyed */
+ size_t destroyed_ports;
/* all listening ports */
server_port *ports;
size_t nports;
size_t port_capacity;
+
+ /* shutdown callback */
+ void (*shutdown_complete)(void *);
+ void *shutdown_complete_arg;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -115,6 +121,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
gpr_mu_init(&s->mu);
gpr_cv_init(&s->cv);
s->active_ports = 0;
+ s->destroyed_ports = 0;
s->cb = NULL;
s->cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
@@ -123,29 +130,62 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
return s;
}
-void grpc_tcp_server_destroy(grpc_tcp_server *s) {
+static void finish_shutdown(grpc_tcp_server *s) {
+ s->shutdown_complete(s->shutdown_complete_arg);
+
+ gpr_mu_destroy(&s->mu);
+ gpr_cv_destroy(&s->cv);
+
+ gpr_free(s->ports);
+ gpr_free(s);
+}
+
+static void destroyed_port(void *server, int success) {
+ grpc_tcp_server *s = server;
+ gpr_mu_lock(&s->mu);
+ s->destroyed_ports++;
+ if (s->destroyed_ports == s->nports) {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ }
+}
+
+static void dont_care_about_shutdown_completion(void *ignored) {}
+
+void grpc_tcp_server_destroy(grpc_tcp_server *s,
+ void (*shutdown_complete)(void *shutdown_complete_arg),
+ void *shutdown_complete_arg) {
size_t i;
gpr_mu_lock(&s->mu);
+
+ s->shutdown_complete = shutdown_complete ? shutdown_complete : dont_care_about_shutdown_completion;
+ s->shutdown_complete_arg = shutdown_complete_arg;
+
/* shutdown all fd's */
for (i = 0; i < s->nports; i++) {
grpc_fd_shutdown(s->ports[i].emfd);
}
/* wait while that happens */
+ /* TODO(ctiller): make this asynchronous also */
while (s->active_ports) {
gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
}
gpr_mu_unlock(&s->mu);
/* delete ALL the things */
- for (i = 0; i < s->nports; i++) {
- server_port *sp = &s->ports[i];
- if (sp->addr.sockaddr.sa_family == AF_UNIX) {
- unlink_if_unix_domain_socket(&sp->addr.un);
+ if (s->nports) {
+ for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ if (sp->addr.sockaddr.sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(&sp->addr.un);
+ }
+ grpc_fd_orphan(sp->emfd, destroyed_port, s);
}
- grpc_fd_orphan(sp->emfd, NULL, NULL);
+ } else {
+ finish_shutdown(s);
}
- gpr_free(s->ports);
- gpr_free(s);
}
/* get max listen queue size on linux */