diff options
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.cc | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.cc | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_posix.cc | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.cc | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 3 | ||||
-rw-r--r-- | test/core/iomgr/ev_epollsig_linux_test.cc | 6 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.cc | 8 | ||||
-rw-r--r-- | test/core/iomgr/pollset_set_test.cc | 3 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_pollset.cc | 4 |
15 files changed, 38 insertions, 56 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 151865cea7..f496e9694d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -104,17 +104,11 @@ static void fd_node_destroy(fd_node* fdn) { GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); gpr_mu_destroy(&fdn->mu); - /* TODO: we need to pass a non-null "release_fd" parameter to - * grpc_fd_orphan because "epollsig" iomgr will close the fd - * even if "already_closed" is true, and it only leaves it open - * if "release_fd" is non-null. This is unlike the rest of the - * pollers, should this be changed within epollsig? */ - int dummy_release_fd; /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */, - "c-ares query finished"); + int dummy_release_fd; + grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); gpr_free(fdn); } diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 98ab974057..86a0243d2e 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -346,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; bool is_release_fd = (release_fd != nullptr); @@ -359,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (is_release_fd) { *release_fd = fd->fd; - } else if (!already_closed) { + } else { close(fd->fd); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index e9b74c6b64..111b62171b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -403,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - bool is_fd_closed = already_closed; + const char* reason) { + bool is_fd_closed = false; gpr_mu_lock(&fd->orphan_mu); @@ -414,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { *release_fd = fd->fd; - } else if (!is_fd_closed) { + } else { close(fd->fd); is_fd_closed = true; } diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 20512e31ac..2189801c18 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -442,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi, /* The caller is expected to hold pi->mu lock before calling this function */ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, - bool is_fd_closed, grpc_error** error) { int err; size_t i; @@ -451,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, /* If fd is already closed, then it would have been automatically been removed from the epoll set */ - if (!is_fd_closed) { - err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); - if (err < 0 && errno != ENOENT) { - gpr_asprintf( - &err_msg, - "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", - pi->epoll_fd, fd->fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } + err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); + if (err < 0 && errno != ENOENT) { + gpr_asprintf( + &err_msg, + "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", + pi->epoll_fd, fd->fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + gpr_free(err_msg); } for (i = 0; i < pi->fd_cnt; i++) { @@ -874,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; polling_island* unref_pi = nullptr; @@ -895,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, before doing this.) */ if (fd->po.pi != nullptr) { polling_island* pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index df6b0e1e89..c9c09881a2 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -425,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { fd->on_done_closure = on_done; fd->released = release_fd != nullptr; if (release_fd != nullptr) { *release_fd = fd->fd; fd->released = true; - } else if (already_closed) { - fd->released = true; } gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 82b21df7ba..1139b3273a 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -209,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) { } void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)", - grpc_fd_wrapped_fd(fd), on_done, release_fd, - already_closed, reason); + const char* reason) { + GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd), + on_done, release_fd, reason); GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd)); - g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason); + g_event_engine->fd_orphan(fd, on_done, release_fd, reason); } void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index e3996ce365..b4c17fc80d 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -46,7 +46,7 @@ typedef struct grpc_event_engine_vtable { grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); void (*fd_shutdown)(grpc_fd* fd, grpc_error* why); void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); @@ -112,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd); notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); /* Has grpc_fd_shutdown been called on an fd? */ bool grpc_fd_is_shutdown(grpc_fd* fd); diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 71e08f1230..296ee74311 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) { finish: if (fd != nullptr) { grpc_pollset_set_del_fd(ac->interested_parties, fd); - grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, - "tcp_client_orphan"); + grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan"); fd = nullptr; } done = (--ac->refs == 0); @@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd( return; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */, - "tcp_client_connect_error"); + grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error"); GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect")); return; } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 43d545846d..9df2e206b2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { static void tcp_free(grpc_tcp* tcp) { grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, - false /* already_closed */, "tcp_unref_orphan"); + "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index ce18ff99e6..8ddf684fea 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) { GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, - false /* already_closed */, "tcp_listener_shutdown"); + "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 99368020d4..bdb2d0e764 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() { grpc_schedule_on_exec_ctx); /* Because at this point, all listening sockets have been shutdown already, no * need to call OnFdAboutToOrphan() to notify the handler again. */ - grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, - false /* already_closed */, "udp_listener_shutdown"); + grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown"); } void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index ac10494631..28c9dd408c 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -79,8 +79,7 @@ static void test_fd_cleanup(test_fd* tfds, int num_fds) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup")); grpc_core::ExecCtx::Get()->Flush(); - grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, - "test_fd_cleanup"); + grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup"); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(release_fd == tfds[i].inner_fd); @@ -287,8 +286,7 @@ static void test_threading(void) { { grpc_core::ExecCtx exec_ctx; grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED); - grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, - false /* already_closed */, "done"); + grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, "done"); grpc_pollset_shutdown(shared.pollset, GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset, grpc_schedule_on_exec_ctx)); diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc index 4a625dd906..4ea2389bbd 100644 --- a/test/core/iomgr/fd_posix_test.cc +++ b/test/core/iomgr/fd_posix_test.cc @@ -115,7 +115,7 @@ static void session_shutdown_cb(void* arg, /*session */ bool success) { session* se = static_cast<session*>(arg); server* sv = se->sv; - grpc_fd_orphan(se->em_fd, nullptr, nullptr, false /* already_closed */, "a"); + grpc_fd_orphan(se->em_fd, nullptr, nullptr, "a"); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd, @@ -171,7 +171,7 @@ static void session_read_cb(void* arg, /*session */ static void listen_shutdown_cb(void* arg /*server */, int success) { server* sv = static_cast<server*>(arg); - grpc_fd_orphan(sv->em_fd, nullptr, nullptr, false /* already_closed */, "b"); + grpc_fd_orphan(sv->em_fd, nullptr, nullptr, "b"); gpr_mu_lock(g_mu); sv->done = 1; @@ -289,7 +289,7 @@ static void client_init(client* cl) { /* Called when a client upload session is ready to shutdown. */ static void client_session_shutdown_cb(void* arg /*client */, int success) { client* cl = static_cast<client*>(arg); - grpc_fd_orphan(cl->em_fd, nullptr, nullptr, false /* already_closed */, "c"); + grpc_fd_orphan(cl->em_fd, nullptr, nullptr, "c"); cl->done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); @@ -502,7 +502,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(g_mu); - grpc_fd_orphan(em_fd, nullptr, nullptr, false /* already_closed */, "d"); + grpc_fd_orphan(em_fd, nullptr, nullptr, "d"); destroy_change_data(&a); destroy_change_data(&b); diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc index e2e63b2918..1aae1daa02 100644 --- a/test/core/iomgr/pollset_set_test.cc +++ b/test/core/iomgr/pollset_set_test.cc @@ -136,8 +136,7 @@ static void cleanup_test_fds(test_fd* tfds, const int num_fds) { * grpc_wakeup_fd and we would like to destroy it ourselves (by calling * grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the * underlying fd, call it with a non-NULL 'release_fd' parameter */ - grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, false /* already_closed */, - "test_fd_cleanup"); + grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup"); grpc_core::ExecCtx::Get()->Flush(); grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index a4383b83cb..050c7f7c17 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -146,7 +146,7 @@ static void BM_PollAddFd(benchmark::State& state) { grpc_pollset_add_fd(ps, fd); grpc_core::ExecCtx::Get()->Flush(); } - grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, "xxx"); + grpc_fd_orphan(fd, nullptr, nullptr, "xxx"); grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, grpc_schedule_on_exec_ctx); @@ -242,7 +242,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { while (!done) { GRPC_ERROR_UNREF(grpc_pollset_work(ps, nullptr, GRPC_MILLIS_INF_FUTURE)); } - grpc_fd_orphan(wakeup, nullptr, nullptr, false /* already_closed */, "done"); + grpc_fd_orphan(wakeup, nullptr, nullptr, "done"); wakeup_fd.read_fd = 0; grpc_closure shutdown_ps_closure; GRPC_CLOSURE_INIT(&shutdown_ps_closure, shutdown_ps, ps, |