diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 117 |
1 files changed, 96 insertions, 21 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 7903297fc6..55a2b98372 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -63,7 +63,7 @@ // a keepalive ping timeout issue. We may want to revert https://github // .com/grpc/grpc/pull/14943 once we figure out the root cause. #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 -#define MAX_PROBE_EPOLL_FDS 32 +#define MAX_FDS_IN_CACHE 32 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -77,8 +77,14 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; typedef struct cached_fd { + // Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more + // details intptr_t salt; + + // The underlying fd int fd; + + // A recency time counter that helps to determine the LRU fd in the cache uint64_t last_used; } cached_fd; @@ -111,10 +117,32 @@ struct pollable { int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; - // Maintain a LRU-eviction cache of fds in this pollable - cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; + // We may be calling pollable_add_fd() on the same (pollable, fd) multiple + // times. To prevent pollable_add_fd() from making multiple sys calls to + // epoll_ctl() to add the fd, we maintain a cache of what fds are already + // present in the underlying epoll-set. + // + // Since this is not a correctness issue, we do not need to maintain all the + // fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE' + // + // NOTE: An ideal implementation of this should do the following: + // 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd + // is added to the pollable's epoll set) + // 2) Remove the fd from the cache whenever the fd is removed from the + // underlying epoll set (i.e whenever fd_orphan() is called). + // + // Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a + // lot of complexity since an fd can be present in multiple pollalbles. So our + // implementation ONLY DOES (1) and NOT (2). + // + // The cache_fd.salt variable helps here to maintain correctness (it serves as + // an epoch that differentiates one grpc_fd from the other even though both of + // them may have the same fd number) + // + // The following implements LRU-eviction cache of fds in this pollable + cached_fd fd_cache[MAX_FDS_IN_CACHE]; int fd_cache_size; - uint64_t fd_cache_counter; + uint64_t fd_cache_counter; // Recency timer tick counter }; static const char* pollable_type_string(pollable_type t) { @@ -157,15 +185,24 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ +// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See +// the description of 'salt' variable in 'grpc_fd' for more details +// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on +// 32-bit systems. Change this to int_64 - atleast on 32-bit systems static gpr_atm g_fd_salt; struct grpc_fd { int fd; + + // Since fd numbers can be reused (after old fds are closed), this serves as + // an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is + // unique (until the salt counter (i.e g_fd_salt) overflows) intptr_t salt; - /* refst format: - bit 0 : 1=Active / 0=Orphaned - bits 1-n : refcount - Ref/Unref by two to avoid altering the orphaned bit */ + + // refst format: + // bit 0 : 1=Active / 0=Orphaned + // bits 1-n : refcount + // Ref/Unref by two to avoid altering the orphaned bit gpr_atm refst; gpr_mu orphan_mu; @@ -175,15 +212,19 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; - /* The pollset that last noticed that the fd is readable. The actual type - * stored in this is (grpc_pollset *) */ + // The pollset that last noticed that the fd is readable. The actual type + // stored in this is (grpc_pollset *) gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + // Do we need to track EPOLLERR events separately? + bool track_err; }; static void fd_global_init(void); @@ -309,6 +350,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +390,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +404,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +412,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -395,8 +440,8 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - bool is_fd_closed = already_closed; + const char* reason) { + bool is_fd_closed = false; gpr_mu_lock(&fd->orphan_mu); @@ -406,7 +451,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { *release_fd = fd->fd; - } else if (!is_fd_closed) { + } else { close(fd->fd); is_fd_closed = true; } @@ -438,8 +483,14 @@ static bool fd_is_shutdown(grpc_fd* fd) { /* Might be called multiple times */ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { - shutdown(fd->fd, SHUT_RDWR); + if (shutdown(fd->fd, SHUT_RDWR)) { + if (errno != ENOTCONN) { + gpr_log(GPR_ERROR, "Error shutting down fd %d. errno: %d", + grpc_fd_wrapped_fd(fd), errno); + } + } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -452,6 +503,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -544,6 +599,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { const int epfd = p->epfd; gpr_mu_lock(&p->mu); p->fd_cache_counter++; + // Handle the case of overflow for our cache counter by // reseting the recency-counter on all cache objects if (p->fd_cache_counter == 0) { @@ -563,8 +619,9 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { lru_idx = i; } } + // Add to cache - if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { + if (p->fd_cache_size < MAX_FDS_IN_CACHE) { lru_idx = p->fd_cache_size; p->fd_cache_size++; } @@ -572,6 +629,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { p->fd_cache[lru_idx].salt = fd->salt; p->fd_cache[lru_idx].last_used = p->fd_cache_counter; gpr_mu_unlock(&p->mu); + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -579,7 +637,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -780,6 +843,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -848,20 +913,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1503,6 +1576,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1510,6 +1584,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, |