diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 42 |
1 files changed, 36 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7903297fc6..f22eb8ee87 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -175,6 +175,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -184,6 +185,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; static void fd_global_init(void); @@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +353,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -440,6 +448,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -452,6 +461,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -579,7 +592,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -780,6 +798,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -848,20 +868,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1503,6 +1531,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1510,6 +1539,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, |