aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollsig_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epollsig_linux.cc')
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc68
1 files changed, 47 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index a144817a83..2189801c18 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -132,6 +132,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -141,6 +142,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
/* Reference counting for fds */
@@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
for (i = 0; i < fd_count; i++) {
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fds[i];
+ /* Use the least significant bit of ev.data.ptr to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
+ (fds[i]->track_err ? 1 : 0));
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
if (err < 0) {
@@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi,
/* The caller is expected to hold pi->mu lock before calling this function */
static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
- bool is_fd_closed,
grpc_error** error) {
int err;
size_t i;
@@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
/* If fd is already closed, then it would have been automatically been removed
from the epoll set */
- if (!is_fd_closed) {
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
- pi->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
+ err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
+ if (err < 0 && errno != ENOENT) {
+ gpr_asprintf(
+ &err_msg,
+ "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
+ pi->epoll_fd, fd->fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ gpr_free(err_msg);
}
for (i = 0; i < pi->fd_cnt; i++) {
@@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -806,7 +811,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_mu_init(&new_fd->po.mu);
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->orphaned = false;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
+ new_fd->track_err = track_err;
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) {
}
static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
- bool already_closed, const char* reason) {
+ const char* reason) {
grpc_error* error = GRPC_ERROR_NONE;
polling_island* unref_pi = nullptr;
@@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
before doing this.) */
if (fd->po.pi != nullptr) {
polling_island* pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
+ polling_island_remove_fd_locked(pi_latest, fd, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
+ bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
+ bool error = (ep_ev[i].events & EPOLLERR) != 0;
+ bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1634,6 +1658,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,