aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/completion_queue.c7
-rw-r--r--src/core/surface/completion_queue.h2
-rw-r--r--src/core/surface/server.c44
-rw-r--r--src/core/surface/server.h2
-rw-r--r--src/core/surface/server_chttp2.c4
5 files changed, 52 insertions, 7 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 6a1d83ce5d..b08bd93693 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -432,3 +432,10 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
+
+void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ grpc_pollset_kick(&cc->pollset);
+ grpc_pollset_work(&cc->pollset, gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 3054264cad..3e9e2d186e 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -114,4 +114,6 @@ void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
+void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
+
#endif /* GRPC_INTERNAL_CORE_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 424734c54c..2e013ea742 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -137,6 +137,7 @@ struct grpc_server {
size_t cq_count;
gpr_mu mu;
+ gpr_cv cv;
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -149,6 +150,7 @@ struct grpc_server {
channel_data root_channel_data;
listener *listeners;
+ int listeners_destroyed;
gpr_refcount internal_refcount;
};
@@ -263,6 +265,7 @@ static void server_unref(grpc_server *server) {
if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
+ gpr_cv_destroy(&server->cv);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
@@ -620,6 +623,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
+ gpr_cv_init(&server->cv);
server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
@@ -781,6 +785,15 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result;
}
+static int num_listeners(grpc_server *server) {
+ listener *l;
+ int n = 0;
+ for (l = server->listeners; l; l = l->next) {
+ n++;
+ }
+ return n;
+}
+
static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l;
@@ -878,11 +891,6 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
}
- while (server->listeners) {
- l = server->listeners;
- server->listeners = l->next;
- gpr_free(l);
- }
}
void grpc_server_shutdown(grpc_server *server) {
@@ -893,8 +901,18 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
shutdown_internal(server, 1, tag);
}
+void grpc_server_listener_destroy_done(void *s) {
+ grpc_server *server = s;
+ gpr_mu_lock(&server->mu);
+ server->listeners_destroyed++;
+ gpr_cv_signal(&server->cv);
+ gpr_mu_unlock(&server->mu);
+}
+
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
+ listener *l;
+ size_t i;
gpr_mu_lock(&server->mu);
if (!server->shutdown) {
gpr_mu_unlock(&server->mu);
@@ -902,6 +920,22 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_lock(&server->mu);
}
+ while (server->listeners_destroyed != num_listeners(server)) {
+ for (i = 0; i < server->cq_count; i++) {
+ gpr_mu_unlock(&server->mu);
+ grpc_cq_hack_spin_pollset(server->cqs[i]);
+ gpr_mu_lock(&server->mu);
+ }
+
+ gpr_cv_wait(&server->cv, &server->mu, gpr_time_add(gpr_now(), gpr_time_from_millis(100)));
+ }
+
+ while (server->listeners) {
+ l = server->listeners;
+ server->listeners = l->next;
+ gpr_free(l);
+ }
+
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index e33f69b8c7..548a16c6c9 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -51,6 +51,8 @@ void grpc_server_add_listener(grpc_server *server, void *listener,
grpc_pollset **pollsets, size_t npollsets),
void (*destroy)(grpc_server *server, void *arg));
+void grpc_server_listener_destroy_done(void *server);
+
/* Setup a transport - creates a channel stack, binds the transport to the
server */
grpc_transport_setup_result grpc_server_setup_transport(
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 27434b39e2..9a23125752 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -75,7 +75,7 @@ static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size
callbacks) */
static void destroy(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_destroy(tcp);
+ grpc_tcp_server_destroy(tcp, grpc_server_listener_destroy_done, server);
}
int grpc_server_add_http2_port(grpc_server *server, const char *addr) {
@@ -131,7 +131,7 @@ error:
grpc_resolved_addresses_destroy(resolved);
}
if (tcp) {
- grpc_tcp_server_destroy(tcp);
+ grpc_tcp_server_destroy(tcp, NULL, NULL);
}
return 0;
}