diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7903297fc6..111b62171b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -175,6 +175,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -184,6 +185,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; static void fd_global_init(void); @@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +353,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -395,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - bool is_fd_closed = already_closed; + const char* reason) { + bool is_fd_closed = false; gpr_mu_lock(&fd->orphan_mu); @@ -406,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { *release_fd = fd->fd; - } else if (!is_fd_closed) { + } else { close(fd->fd); is_fd_closed = true; } @@ -438,8 +446,14 @@ static bool fd_is_shutdown(grpc_fd* fd) { /* Might be called multiple times */ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { - shutdown(fd->fd, SHUT_RDWR); + if (shutdown(fd->fd, SHUT_RDWR)) { + if (errno != ENOTCONN) { + gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", + grpc_fd_wrapped_fd(fd), errno); + } + } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -452,6 +466,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -579,7 +597,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -780,6 +803,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -848,20 +873,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1503,6 +1536,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1510,6 +1544,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, |