diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-06-12 10:31:05 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-12 10:31:05 -0700 |
commit | a14729f22aa2fd9088fe90b82ad8a7c3b85f353c (patch) | |
tree | fd7607c47e0d103a68a3bc1a48d0cd49f5a632a2 | |
parent | a70053690a3f07ecf69ff283c288d018d2362ee4 (diff) | |
parent | dddf20793269a7ba5f658ae5fdcde2c175cf196b (diff) |
Merge pull request #15092 from yashykt/epollerr
Make linux polling engines capable of tracking errors separately
19 files changed, 180 insertions, 56 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index ebe2c4c41c..00a84302ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -284,7 +284,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name, false); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index b95c9dae53..dfed824cd5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client"), args, "fd-client"); + grpc_fd_create(fd, "client", false), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 371e463814..a0228785ee 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, char* name; gpr_asprintf(&name, "fd:%d", fd); - grpc_endpoint* server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name); + grpc_endpoint* server_endpoint = + grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 49850ab3a1..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index cf839619cd..98ab974057 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -136,6 +136,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; @@ -272,7 +273,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } - new_fd->fd = fd; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) { struct epoll_event ev; ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = new_fd; + /* Use the least significant bit of ev.data.ptr to store track_err. We expect + * the addresses to be word aligned. We need to store track_err to avoid + * synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) | + (track_err ? 1 : 0)); if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); } @@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why, shutdown(fd->fd, SHUT_RDWR); } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, grpc_iomgr_unregister_object(&fd->iomgr_object); fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { fd->read_closure->SetReady(); /* Use release store to match with acquire load in fd_get_read_notifier */ @@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + /******************************************************************************* * Pollset Definitions */ @@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) { append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1); + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } - if (read_ev || cancel) { + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1183,6 +1208,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7903297fc6..f22eb8ee87 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -175,6 +175,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -184,6 +185,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; static void fd_global_init(void); @@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +353,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -440,6 +448,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -452,6 +461,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -579,7 +592,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -780,6 +798,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -848,20 +868,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1503,6 +1531,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1510,6 +1539,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index a144817a83..20512e31ac 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -132,6 +132,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -141,6 +142,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; /* Reference counting for fds */ @@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds, for (i = 0; i < fd_count; i++) { ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fds[i]; + /* Use the least significant bit of ev.data.ptr to store track_err to avoid + * synchronization issues when accessing it after receiving an event */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) | + (fds[i]->track_err ? 1 : 0)); err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev); if (err < 0) { @@ -769,6 +776,7 @@ static void unref_by(grpc_fd* fd, int n) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -806,7 +814,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -821,6 +829,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_mu_init(&new_fd->po.mu); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } /* Note: It is not really needed to get the new_fd->po.mu lock here. If this @@ -837,6 +846,8 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->orphaned = false; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); + new_fd->track_err = track_err; gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -933,6 +944,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -945,6 +957,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollset Definitions */ @@ -1116,6 +1132,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static void pollset_release_polling_island(grpc_pollset* ps, const char* reason) { if (ps->po.pi != nullptr) { @@ -1254,14 +1272,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, to the function pollset_work_and_unlock() will pick up the correct epoll_fd */ } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1); + bool cancel = (ep_ev[i].events & EPOLLHUP) != 0; + bool error = (ep_ev[i].events & EPOLLERR) != 0; + bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1634,6 +1661,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1641,6 +1669,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 70958ed562..df6b0e1e89 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) { } } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { + GPR_DEBUG_ASSERT(track_err == false); grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r))); gpr_mu_init(&r->mu); gpr_atm_rel_store(&r->refst, 1); @@ -553,6 +554,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { gpr_mu_unlock(&fd->mu); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); + abort(); +} + static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, grpc_pollset_worker* worker, uint32_t read_mask, uint32_t write_mask, grpc_fd_watcher* watcher) { @@ -1710,6 +1716,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + false, fd_create, fd_wrapped_fd, @@ -1717,6 +1724,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6b7eca0afa..82b21df7ba 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) { g_event_engine = nullptr; } -grpc_fd* grpc_fd_create(int fd, const char* name) { - GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name); - GRPC_FD_TRACE("fd_create(%d, %s)", fd, name); - return g_event_engine->fd_create(fd, name); +bool grpc_event_engine_can_track_errors(void) { + return g_event_engine->can_track_err; +} + +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { + GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); + return g_event_engine->fd_create(fd, name, track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { @@ -231,6 +236,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { g_event_engine->fd_notify_on_write(fd, closure); } +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + g_event_engine->fd_notify_on_error(fd, closure); +} + static size_t pollset_size(void) { return g_event_engine->pollset_size; } static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 82cbce9a7b..e3996ce365 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { size_t pollset_size; + bool can_track_err; - grpc_fd* (*fd_create)(int fd, const char* name); + grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd, bool already_closed, const char* reason); void (*fd_shutdown)(grpc_fd* fd, grpc_error* why); void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); + void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure); bool (*fd_is_shutdown)(grpc_fd* fd); grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd); @@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void); /* Return the name of the poll strategy */ const char* grpc_get_poll_strategy_name(); +/* Returns true if polling engine can track errors separately, false otherwise. + * If this is true, fd can be created with track_err set. After this, error + * events will be reported using fd_notify_on_error. If it is not set, errors + * will continue to be reported through fd_notify_on_read and + * fd_notify_on_write. + */ +bool grpc_event_engine_can_track_errors(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. + \a track_err if true means that error events would be tracked separately + using grpc_fd_notify_on_error. Currently, valid only for linux systems. This takes ownership of closing fd. */ -grpc_fd* grpc_fd_create(int fd, const char* name); +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err); /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd* fd); @@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure); /* Exactly the same semantics as above, except based on writable events. */ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure); +/* Exactly the same semantics as above, except based on error events. track_err + * needs to have been set on grpc_fd_create */ +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure); + /* Return the read notifier pollset from the fd */ grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd); diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 39da7f1637..71e08f1230 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -280,7 +280,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name); + *fdobj = grpc_fd_create(fd, name, false); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 0a5caca906..ce18ff99e6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name); + grpc_fd* fdobj = grpc_fd_create(fd, name, false); read_notifier_pollset = sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 73afa15e65..b9f8145572 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 51d17eb174..99368020d4 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name); + emfd_ = grpc_fd_create(fd, name, false); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); diff --git a/test/core/iomgr/ev_epollsig_linux_test.cc b/test/core/iomgr/ev_epollsig_linux_test.cc index c3ba6d7c14..ac10494631 100644 --- a/test/core/iomgr/ev_epollsig_linux_test.cc +++ b/test/core/iomgr/ev_epollsig_linux_test.cc @@ -66,7 +66,7 @@ static void test_fd_init(test_fd* tfds, int* fds, int num_fds) { for (i = 0; i < num_fds; i++) { tfds[i].inner_fd = fds[i]; - tfds[i].fd = grpc_fd_create(fds[i], "test_fd"); + tfds[i].fd = grpc_fd_create(fds[i], "test_fd", false); } } @@ -267,7 +267,7 @@ static void test_threading(void) { grpc_wakeup_fd fd; GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd))); shared.wakeup_fd = &fd; - shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup"); + shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup", false); shared.wakeups = 0; { grpc_core::ExecCtx exec_ctx; diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc index b81c73b2c0..4a625dd906 100644 --- a/test/core/iomgr/fd_posix_test.cc +++ b/test/core/iomgr/fd_posix_test.cc @@ -204,7 +204,7 @@ static void listen_cb(void* arg, /*=sv_arg*/ fcntl(fd, F_SETFL, flags | O_NONBLOCK); se = static_cast<session*>(gpr_malloc(sizeof(*se))); se->sv = sv; - se->em_fd = grpc_fd_create(fd, "listener"); + se->em_fd = grpc_fd_create(fd, "listener", false); grpc_pollset_add_fd(g_pollset, se->em_fd); GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se, grpc_schedule_on_exec_ctx); @@ -233,7 +233,7 @@ static int server_start(server* sv) { port = ntohs(sin.sin_port); GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); - sv->em_fd = grpc_fd_create(fd, "server"); + sv->em_fd = grpc_fd_create(fd, "server", false); grpc_pollset_add_fd(g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv, @@ -353,7 +353,7 @@ static void client_start(client* cl, int port) { } } - cl->em_fd = grpc_fd_create(fd, "client"); + cl->em_fd = grpc_fd_create(fd, "client", false); grpc_pollset_add_fd(g_pollset, cl->em_fd); client_session_write(cl, GRPC_ERROR_NONE); @@ -454,7 +454,7 @@ static void test_grpc_fd_change(void) { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); + em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change", false); grpc_pollset_add_fd(g_pollset, em_fd); /* Register the first callback, then make its FD readable */ diff --git a/test/core/iomgr/pollset_set_test.cc b/test/core/iomgr/pollset_set_test.cc index 0dc75a5f3f..e2e63b2918 100644 --- a/test/core/iomgr/pollset_set_test.cc +++ b/test/core/iomgr/pollset_set_test.cc @@ -118,7 +118,7 @@ static void init_test_fds(test_fd* tfds, const int num_fds) { for (int i = 0; i < num_fds; i++) { GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd)); tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd), - "test_fd"); + "test_fd", false); reset_test_fd(&tfds[i]); } } diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index f4df6fca23..3e87831e44 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -176,7 +176,8 @@ static void read_test(size_t num_bytes, size_t slice_size) { a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); + ep = + grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -226,7 +227,8 @@ static void large_read_test(size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test", false), &args, + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); written_bytes = fill_socket(sv[0]); @@ -365,7 +367,8 @@ static void write_test(size_t num_bytes, size_t slice_size) { a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args, + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); state.ep = ep; @@ -433,7 +436,8 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); + ep = + grpc_tcp_create(grpc_fd_create(sv[1], "read_test", false), &args, "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); grpc_endpoint_add_to_pollset(ep, g_pollset); @@ -522,10 +526,10 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = static_cast<int>(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - f.client_ep = - grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test"); - f.server_ep = - grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test"); + f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client", false), + &args, "test"); + f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server", false), + &args, "test"); grpc_resource_quota_unref_internal(resource_quota); grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); grpc_endpoint_add_to_pollset(f.server_ep, g_pollset); diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc index bcb68ff229..a4383b83cb 100644 --- a/test/cpp/microbenchmarks/bm_pollset.cc +++ b/test/cpp/microbenchmarks/bm_pollset.cc @@ -141,7 +141,7 @@ static void BM_PollAddFd(benchmark::State& state) { grpc_wakeup_fd wakeup_fd; GPR_ASSERT( GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd))); - grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx"); + grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx", false); while (state.KeepRunning()) { grpc_pollset_add_fd(ps, fd); grpc_core::ExecCtx::Get()->Flush(); @@ -222,7 +222,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) { grpc_core::ExecCtx exec_ctx; grpc_wakeup_fd wakeup_fd; GRPC_ERROR_UNREF(grpc_wakeup_fd_init(&wakeup_fd)); - grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read"); + grpc_fd* wakeup = grpc_fd_create(wakeup_fd.read_fd, "wakeup_read", false); grpc_pollset_add_fd(ps, wakeup); bool done = false; Closure* continue_closure = MakeClosure( |