diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollsig_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.cc | 68 |
1 files changed, 47 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index a144817a83..2189801c18 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -132,6 +132,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -141,6 +142,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; /* Reference counting for fds */ @@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds, for (i = 0; i < fd_count; i++) { ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fds[i]; + /* Use the least significant bit of ev.data.ptr to store track_err to avoid + * synchronization issues when accessing it after receiving an event */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) | + (fds[i]->track_err ? 1 : 0)); err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev); if (err < 0) { @@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi, /* The caller is expected to hold pi->mu lock before calling this function */ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, - bool is_fd_closed, grpc_error** error) { int err; size_t i; @@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, /* If fd is already closed, then it would have been automatically been removed from the epoll set */ - if (!is_fd_closed) { - err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); - if (err < 0 && errno != ENOENT) { - gpr_asprintf( - &err_msg, - "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", - pi->epoll_fd, fd->fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } + err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); + if (err < 0 && errno != ENOENT) { + gpr_asprintf( + &err_msg, + "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", + pi->epoll_fd, fd->fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + gpr_free(err_msg); } for (i = 0; i < pi->fd_cnt; i++) { @@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -806,7 +811,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_mu_init(&new_fd->po.mu); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } /* Note: It is not really needed to get the new_fd->po.mu lock here. If this @@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->orphaned = false; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); + new_fd->track_err = track_err; gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; polling_island* unref_pi = nullptr; @@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, before doing this.) */ if (fd->po.pi != nullptr) { polling_island* pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; @@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollset Definitions */ @@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static void pollset_release_polling_island(grpc_pollset* ps, const char* reason) { if (ps->po.pi != nullptr) { @@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, to the function pollset_work_and_unlock() will pick up the correct epoll_fd */ } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1); + bool cancel = (ep_ev[i].events & EPOLLHUP) != 0; + bool error = (ep_ev[i].events & EPOLLERR) != 0; + bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1634,6 +1658,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, |