aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c7
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c5
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c6
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c6
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c5
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c7
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/lib/iomgr/udp_server.c2
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.c6
-rw-r--r--test/core/iomgr/fd_posix_test.c11
-rw-r--r--test/core/iomgr/pollset_set_test.c3
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc5
17 files changed, 48 insertions, 38 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
index 1ab8295e9e..b696344eab 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
@@ -103,10 +103,9 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following
- grpc_fd_orphan. To prevent this fd from being closed by grpc_fd_orphan,
- a fd pointer is provided. */
- int fd;
- grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, &fd, "c-ares query finished");
+ grpc_fd_orphan. */
+ grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */,
+ "c-ares query finished");
gpr_free(fdn);
}
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index b89b8af15a..8db4a92453 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -249,7 +249,7 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
@@ -260,7 +260,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!already_closed) {
close(fd->fd);
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 5166dc2ac2..f2f3e15704 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -931,7 +931,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -952,8 +952,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 1058f69a83..07c8eadf4f 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -493,8 +493,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
epoll_set *unref_eps = NULL;
@@ -505,7 +505,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index a2fa4ad9a5..770d1fd0a9 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -380,8 +380,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->pollable.po.mu);
@@ -392,7 +392,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index a84c208e1a..11067e3959 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -854,7 +854,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -875,8 +875,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 1f8d7eef26..365aa583bb 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -398,11 +398,14 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
- if (fd->released) {
+ if (release_fd != NULL) {
*release_fd = fd->fd;
+ fd->released = true;
+ } else if (already_closed) {
+ fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index cff77e641c..91f8cd5482 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -170,8 +170,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason) {
- g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
+ int *release_fd, bool already_closed, const char *reason) {
+ g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, already_closed,
+ reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 0de7333843..1108e46ef8 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -37,7 +37,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd *(*fd_create)(int fd, const char *name);
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
@@ -104,7 +104,7 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 21e320a6e7..a25fba4527 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -209,7 +209,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan");
+ grpc_fd_orphan(exec_ctx, fd, NULL, NULL, false /* already_closed */,
+ "tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@@ -295,7 +296,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
+ grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, false /* already_closed */,
+ "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index b6dcd15cb0..2f543fd8a9 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -156,7 +156,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
- "tcp_unref_orphan");
+ false /* already_closed */, "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index f304642951..0fc5c0fd86 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -166,7 +166,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "tcp_listener_shutdown");
+ false /* already_closed */, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 54e7f417a7..88fa34cb7a 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -214,7 +214,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "udp_listener_shutdown");
+ false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/test/core/iomgr/ev_epollsig_linux_test.c b/test/core/iomgr/ev_epollsig_linux_test.c
index 1d272fa406..c702065d4d 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.c
+++ b/test/core/iomgr/ev_epollsig_linux_test.c
@@ -79,7 +79,8 @@ static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *tfds,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
grpc_exec_ctx_flush(exec_ctx);
- grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(release_fd == tfds[i].inner_fd);
@@ -294,7 +295,8 @@ static void test_threading(void) {
{
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_fd_shutdown(&exec_ctx, shared.wakeup_desc, GRPC_ERROR_CANCELLED);
- grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL, "done");
+ grpc_fd_orphan(&exec_ctx, shared.wakeup_desc, NULL, NULL,
+ false /* already_closed */, "done");
grpc_pollset_shutdown(&exec_ctx, shared.pollset,
GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
grpc_schedule_on_exec_ctx));
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 02596450d2..85d5d9c07f 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -114,7 +114,8 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
bool success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
+ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, false /* already_closed */,
+ "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd,
@@ -171,7 +172,8 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
int success) {
server *sv = arg;
- grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b");
+ grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, false /* already_closed */,
+ "b");
gpr_mu_lock(g_mu);
sv->done = 1;
@@ -291,7 +293,8 @@ static void client_init(client *cl) {
static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
void *arg /*client */, int success) {
client *cl = arg;
- grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c");
+ grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, false /* already_closed */,
+ "c");
cl->done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
@@ -511,7 +514,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(g_mu);
- grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d");
+ grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, false /* already_closed */, "d");
grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);
diff --git a/test/core/iomgr/pollset_set_test.c b/test/core/iomgr/pollset_set_test.c
index 6aedaf1081..5750ac0f4b 100644
--- a/test/core/iomgr/pollset_set_test.c
+++ b/test/core/iomgr/pollset_set_test.c
@@ -137,7 +137,8 @@ static void cleanup_test_fds(grpc_exec_ctx *exec_ctx, test_fd *tfds,
* grpc_wakeup_fd and we would like to destroy it ourselves (by calling
* grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
* underlying fd, call it with a non-NULL 'release_fd' parameter */
- grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd, "test_fd_cleanup");
+ grpc_fd_orphan(exec_ctx, tfds[i].fd, NULL, &release_fd,
+ false /* already_closed */, "test_fd_cleanup");
grpc_exec_ctx_flush(exec_ctx);
grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 683f4703c2..1fc1f2f83b 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -149,7 +149,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_pollset_add_fd(&exec_ctx, ps, fd);
grpc_exec_ctx_flush(&exec_ctx);
}
- grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
+ grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, false /* already_closed */, "xxx");
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,
grpc_schedule_on_exec_ctx);
@@ -247,7 +247,8 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
while (!done) {
GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
- grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
+ grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, false /* already_closed */,
+ "done");
wakeup_fd.read_fd = 0;
grpc_closure shutdown_ps_closure;
GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps,