aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/udp_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/udp_server.c')
-rw-r--r--src/core/lib/iomgr/udp_server.c38
1 files changed, 29 insertions, 9 deletions
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 60579e18ba..ca283d034f 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -79,10 +79,15 @@ struct grpc_udp_listener {
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
+ // To be called when corresponding QuicGrpcServer closes all active
+ // connections.
+ grpc_closure orphan_fd_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ // True if orphan_cb is trigered.
+ bool orphan_notified;
struct grpc_udp_listener *next;
};
@@ -146,6 +151,14 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
return s;
}
+static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) {
+ grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error));
+}
+
+static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ // No-op.
+}
+
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
@@ -195,12 +208,16 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
-
- /* Call the orphan_cb to signal that the FD is about to be closed and
- * should no longer be used. */
- GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
-
+ if (!sp->orphan_notified) {
+ /* Call the orphan_cb to signal that the FD is about to be closed and
+ * should no longer be used. Because at this point, all listening ports
+ * have been shutdown already, no need to shutdown again.*/
+ grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ grpc_schedule_on_exec_ctx);
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
+ sp->server->user_data);
+ }
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -225,9 +242,11 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
- sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
- grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Server destroyed"));
+ grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd,
+ grpc_schedule_on_exec_ctx);
+ sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
+ sp->server->user_data);
+ sp->orphan_notified = true;
}
gpr_mu_unlock(&s->mu);
} else {
@@ -391,6 +410,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->orphan_notified = false;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);