aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-06-12 10:31:05 -0700
committerGravatar GitHub <noreply@github.com>2018-06-12 10:31:05 -0700
commita14729f22aa2fd9088fe90b82ad8a7c3b85f353c (patch)
treefd7607c47e0d103a68a3bc1a48d0cd49f5a632a2
parenta70053690a3f07ecf69ff283c288d018d2362ee4 (diff)
parentdddf20793269a7ba5f658ae5fdcde2c175cf196b (diff)
Merge pull request #15092 from yashykt/epollerr
Make linux polling engines capable of tracking errors separately
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc2
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc5
-rw-r--r--src/core/lib/iomgr/endpoint_pair_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc41
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc42
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc45
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc10
-rw-r--r--src/core/lib/iomgr/ev_posix.cc17
-rw-r--r--src/core/lib/iomgr/ev_posix.h20
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc2
-rw-r--r--src/core/lib/iomgr/udp_server.cc2
-rw-r--r--test/core/iomgr/ev_epollsig_linux_test.cc4
-rw-r--r--test/core/iomgr/fd_posix_test.cc8
-rw-r--r--test/core/iomgr/pollset_set_test.cc2
-rw-r--r--test/core/iomgr/tcp_posix_test.cc20
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc4
19 files changed, 180 insertions, 56 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index ebe2c4c41c..00a84302ed 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -284,7 +284,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node)));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
- fdn->fd = grpc_fd_create(socks[i], fd_name);
+ fdn->fd = grpc_fd_create(socks[i], fd_name, false);
fdn->ev_driver = ev_driver;
fdn->readable_registered = false;
fdn->writable_registered = false;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
index b95c9dae53..dfed824cd5 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
@@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
grpc_endpoint* client = grpc_tcp_client_create_from_fd(
- grpc_fd_create(fd, "client"), args, "fd-client");
+ grpc_fd_create(fd, "client", false), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
index 371e463814..a0228785ee 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
@@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
char* name;
gpr_asprintf(&name, "fd:%d", fd);
- grpc_endpoint* server_endpoint = grpc_tcp_create(
- grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name);
+ grpc_endpoint* server_endpoint =
+ grpc_tcp_create(grpc_fd_create(fd, name, false),
+ grpc_server_get_channel_args(server), name);
gpr_free(name);
diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc
index 49850ab3a1..5c5c246f99 100644
--- a/src/core/lib/iomgr/endpoint_pair_posix.cc
+++ b/src/core/lib/iomgr/endpoint_pair_posix.cc
@@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name,
grpc_core::ExecCtx exec_ctx;
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args,
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args,
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args,
"socketpair-client");
gpr_free(final_name);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index cf839619cd..98ab974057 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -136,6 +136,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
@@ -272,7 +273,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
-
new_fd->fd = fd;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) {
struct epoll_event ev;
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = new_fd;
+ /* Use the least significant bit of ev.data.ptr to store track_err. We expect
+ * the addresses to be word aligned. We need to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
+ (track_err ? 1 : 0));
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
shutdown(fd->fd, SHUT_RDWR);
}
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
grpc_iomgr_unregister_object(&fd->iomgr_object);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
@@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
fd->read_closure->SetReady();
/* Use release store to match with acquire load in fd_get_read_notifier */
@@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
- if (read_ev || cancel) {
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1183,6 +1208,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 7903297fc6..f22eb8ee87 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -175,6 +175,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -184,6 +185,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
static void fd_global_init(void);
@@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
}
@@ -348,7 +353,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
gpr_mu_init(&new_fd->pollable_mu);
@@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
+ new_fd->track_err = track_err;
new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -440,6 +448,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -452,6 +461,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollable Definitions
*/
@@ -579,7 +592,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
- ev_fd.data.ptr = fd;
+ /* Use the second least significant bit of ev_fd.data.ptr to store track_err
+ * to avoid synchronization issues when accessing it after receiving an event.
+ * Accessing fd would be a data race there because the fd might have been
+ * returned to the free list at that point. */
+ ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) |
+ (fd->track_err ? 2 : 0));
GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
@@ -780,6 +798,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
gpr_mu_lock(&fd->pollable_mu);
grpc_error* error = GRPC_ERROR_NONE;
@@ -848,20 +868,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
(intptr_t)data_ptr)),
err_desc);
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ grpc_fd* fd =
+ reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2);
+ bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2;
+ bool cancel = (ev->events & EPOLLHUP) != 0;
+ bool error = (ev->events & EPOLLERR) != 0;
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
}
- if (read_ev || cancel) {
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1503,6 +1531,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1510,6 +1539,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index a144817a83..20512e31ac 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -132,6 +132,7 @@ struct grpc_fd {
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
+ grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
struct grpc_fd* freelist_next;
grpc_closure* on_done_closure;
@@ -141,6 +142,9 @@ struct grpc_fd {
gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object;
+
+ /* Do we need to track EPOLLERR events separately? */
+ bool track_err;
};
/* Reference counting for fds */
@@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
for (i = 0; i < fd_count; i++) {
ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fds[i];
+ /* Use the least significant bit of ev.data.ptr to store track_err to avoid
+ * synchronization issues when accessing it after receiving an event */
+ ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
+ (fds[i]->track_err ? 1 : 0));
err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
if (err < 0) {
@@ -769,6 +776,7 @@ static void unref_by(grpc_fd* fd, int n) {
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
+ fd->error_closure->DestroyEvent();
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -806,7 +814,7 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
gpr_mu_lock(&fd_freelist_mu);
@@ -821,6 +829,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
gpr_mu_init(&new_fd->po.mu);
new_fd->read_closure.Init();
new_fd->write_closure.Init();
+ new_fd->error_closure.Init();
}
/* Note: It is not really needed to get the new_fd->po.mu lock here. If this
@@ -837,6 +846,8 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->orphaned = false;
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
+ new_fd->error_closure->InitEvent();
+ new_fd->track_err = track_err;
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
new_fd->freelist_next = nullptr;
@@ -933,6 +944,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
shutdown(fd->fd, SHUT_RDWR);
fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
+ fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
@@ -945,6 +957,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
fd->write_closure->NotifyOn(closure);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ fd->error_closure->NotifyOn(closure);
+}
+
/*******************************************************************************
* Pollset Definitions
*/
@@ -1116,6 +1132,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) {
static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
+static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
+
static void pollset_release_polling_island(grpc_pollset* ps,
const char* reason) {
if (ps->po.pi != nullptr) {
@@ -1254,14 +1272,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset,
to the function pollset_work_and_unlock() will pick up the correct
epoll_fd */
} else {
- grpc_fd* fd = static_cast<grpc_fd*>(data_ptr);
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
+ grpc_fd* fd = reinterpret_cast<grpc_fd*>(
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
+ bool track_err =
+ reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
+ bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
+ bool error = (ep_ev[i].events & EPOLLERR) != 0;
+ bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
+ bool err_fallback = error && !track_err;
+
+ if (error && !err_fallback) {
+ fd_has_errors(fd);
+ }
+ if (read_ev || cancel || err_fallback) {
fd_become_readable(fd, pollset);
}
- if (write_ev || cancel) {
+ if (write_ev || cancel || err_fallback) {
fd_become_writable(fd);
}
}
@@ -1634,6 +1661,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ true,
fd_create,
fd_wrapped_fd,
@@ -1641,6 +1669,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 70958ed562..df6b0e1e89 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) {
}
}
-static grpc_fd* fd_create(int fd, const char* name) {
+static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
+ GPR_DEBUG_ASSERT(track_err == false);
grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
gpr_mu_init(&r->mu);
gpr_atm_rel_store(&r->refst, 1);
@@ -553,6 +554,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
gpr_mu_unlock(&fd->mu);
}
+static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
+ abort();
+}
+
static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
grpc_pollset_worker* worker, uint32_t read_mask,
uint32_t write_mask, grpc_fd_watcher* watcher) {
@@ -1710,6 +1716,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
+ false,
fd_create,
fd_wrapped_fd,
@@ -1717,6 +1724,7 @@ static const grpc_event_engine_vtable vtable = {
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
+ fd_notify_on_error,
fd_is_shutdown,
fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6b7eca0afa..82b21df7ba 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) {
g_event_engine = nullptr;
}
-grpc_fd* grpc_fd_create(int fd, const char* name) {
- GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name);
- GRPC_FD_TRACE("fd_create(%d, %s)", fd, name);
- return g_event_engine->fd_create(fd, name);
+bool grpc_event_engine_can_track_errors(void) {
+ return g_event_engine->can_track_err;
+}
+
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
+ GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
+ GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err);
+ return g_event_engine->fd_create(fd, name, track_err);
}
int grpc_fd_wrapped_fd(grpc_fd* fd) {
@@ -231,6 +236,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_write(fd, closure);
}
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
+ g_event_engine->fd_notify_on_error(fd, closure);
+}
+
static size_t pollset_size(void) { return g_event_engine->pollset_size; }
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 82cbce9a7b..e3996ce365 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable {
size_t pollset_size;
+ bool can_track_err;
- grpc_fd* (*fd_create)(int fd, const char* name);
+ grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd);
void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
bool already_closed, const char* reason);
void (*fd_shutdown)(grpc_fd* fd, grpc_error* why);
void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure);
void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure);
+ void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure);
bool (*fd_is_shutdown)(grpc_fd* fd);
grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd);
@@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void);
/* Return the name of the poll strategy */
const char* grpc_get_poll_strategy_name();
+/* Returns true if polling engine can track errors separately, false otherwise.
+ * If this is true, fd can be created with track_err set. After this, error
+ * events will be reported using fd_notify_on_error. If it is not set, errors
+ * will continue to be reported through fd_notify_on_read and
+ * fd_notify_on_write.
+ */
+bool grpc_event_engine_can_track_errors();
+
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
+ \a track_err if true means that error events would be tracked separately
+ using grpc_fd_notify_on_error. Currently, valid only for linux systems.
This takes ownership of closing fd. */
-grpc_fd* grpc_fd_create(int fd, const char* name);
+grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err);
/* Return the wrapped fd, or -1 if it has been released or closed. */
int grpc_fd_wrapped_fd(grpc_fd* fd);
@@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure);
/* Exactly the same semantics as above, except based on writable events. */
void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure);
+/* Exactly the same semantics as above, except based on error events. track_err
+ * needs to have been set on grpc_fd_create */
+void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure);
+
/* Return the read notifier pollset from the fd */
grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd);
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 39da7f1637..71e08f1230 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -280,7 +280,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args,
}
addr_str = grpc_sockaddr_to_uri(mapped_addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- *fdobj = grpc_fd_create(fd, name);
+ *fdobj = grpc_fd_create(fd, name, false);
gpr_free(name);
gpr_free(addr_str);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 0a5caca906..ce18ff99e6 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) {
gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- grpc_fd* fdobj = grpc_fd_create(fd, name);
+ grpc_fd* fdobj = grpc_fd_create(fd, name, false);
read_notifier_pollset =
sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
@@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
listener->sibling = sp;
sp->server = listener->server;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = listener->port_index;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 73afa15e65..b9f8145572 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd,
s->tail = sp;
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, name, false);
memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
sp->port = port;
sp->port_index = port_index;
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 51d17eb174..99368020d4 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd,
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
gpr_free(addr_str);
- emfd_ = grpc_fd_create(fd, name);
+ emfd_ = grpc_fd_create(fd, name, false);
memcpy(&addr_, addr, sizeof(grpc_resolved_address));
GPR_ASSERT(emfd_);
gpr_free(name);
diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc
index c3ba6d7c14..ac10494631 100644
--- a/test/core/iomgr/ev_epollsig_linux_test.cc
+++ b/test/core/iomgr/ev_epollsig_linux_test.cc
@@ -66,7 +66,7 @@ static void test_fd_init(test_fd* tfds, int* fds, int num_fds) {
for (i = 0; i < num_fds; i++) {
tfds[i].inner_fd = fds[i];
- tfds[i].fd = grpc_fd_create(fds[i], "test_fd");
+ tfds[i].fd = grpc_fd_create(fds[i], "test_fd", false);
}
}
@@ -267,7 +267,7 @@ static void test_threading(void) {
grpc_wakeup_fd fd;
GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd)));
shared.wakeup_fd = &fd;
- shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup");
+ shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup", false);
shared.wakeups = 0;
{
grpc_core::ExecCtx exec_ctx;
diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc
index b81c73b2c0..4a625dd906 100644
--- a/test/core/iomgr/fd_posix_test.cc
+++ b/test/core/iomgr/fd_posix_test.cc
@@ -204,7 +204,7 @@ static void listen_cb(void* arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
- se->em_fd = grpc_fd_create(fd, "listener");
+ se->em_fd = grpc_fd_create(fd, "listener", false);
grpc_pollset_add_fd(g_pollset, se->em_fd);
GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se,
grpc_schedule_on_exec_ctx);
@@ -233,7 +233,7 @@ static int server_start(server* sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- sv->em_fd = grpc_fd_create(fd, "server");
+ sv->em_fd = grpc_fd_create(fd, "server", false);
grpc_pollset_add_fd(g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv,
@@ -353,7 +353,7 @@ static void client_start(client* cl, int port) {
}
}
- cl->em_fd = grpc_fd_create(fd, "client");
+ cl->em_fd = grpc_fd_create(fd, "client", false);
grpc_pollset_add_fd(g_pollset, cl->em_fd);
client_session_write(cl, GRPC_ERROR_NONE);
@@ -454,7 +454,7 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
+ em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false);
grpc_pollset_add_fd(g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc
index 0dc75a5f3f..e2e63b2918 100644
--- a/test/core/iomgr/pollset_set_test.cc
+++ b/test/core/iomgr/pollset_set_test.cc
@@ -118,7 +118,7 @@ static void init_test_fds(test_fd* tfds, const int num_fds) {
for (int i = 0; i < num_fds; i++) {
GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd));
tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd),
- "test_fd");
+ "test_fd", false);
reset_test_fd(&tfds[i]);
}
}
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index f4df6fca23..3e87831e44 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -176,7 +176,8 @@ static void read_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
+ ep =
+ grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -226,7 +227,8 @@ static void large_read_test(size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args,
+ "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@@ -365,7 +367,8 @@ static void write_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
+ "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@@ -433,7 +436,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
+ ep =
+ grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
grpc_endpoint_add_to_pollset(ep, g_pollset);
@@ -522,10 +526,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- f.client_ep =
- grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test");
- f.server_ep =
- grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test");
+ f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false),
+ &args, "test");
+ f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false),
+ &args, "test");
grpc_resource_quota_unref_internal(resource_quota);
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index bcb68ff229..a4383b83cb 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -141,7 +141,7 @@ static void BM_PollAddFd(benchmark::State& state) {
grpc_wakeup_fd wakeup_fd;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
- grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx");
+ grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx", false);
while (state.KeepRunning()) {
grpc_pollset_add_fd(ps, fd);
grpc_core::ExecCtx::Get()->Flush();
@@ -222,7 +222,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_core::ExecCtx exec_ctx;
grpc_wakeup_fd wakeup_fd;
GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd));
- grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read");
+ grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false);
grpc_pollset_add_fd(ps, wakeup);
bool done = false;
Closure* continue_closure = MakeClosure(