diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 98369ddd6e..4c6cff7fe2 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -63,6 +63,7 @@ // a keepalive ping timeout issue. We may want to revert https://github // .com/grpc/grpc/pull/14943 once we figure out the root cause. #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 +#define MAX_PROBE_EPOLL_FDS 32 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -75,6 +76,12 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; +typedef struct cached_fd { + intptr_t salt; + int fd; + uint64_t last_used; +} cached_fd; + /// A pollable is something that can be polled: it has an epoll set to poll on, /// and a wakeup fd for kicks /// There are three broad types: @@ -103,6 +110,11 @@ struct pollable { int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; + + // Maintain a LRU-eviction cache of fds in this pollable + cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; + int fd_cache_size; + uint64_t fd_cache_counter; }; static const char* pollable_type_string(pollable_type t) { @@ -145,8 +157,11 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ +static gpr_atm g_fd_salt; + struct grpc_fd { int fd; + intptr_t salt; /* refst format: bit 0 : 1=Active / 0=Orphaned bits 1-n : refcount @@ -354,6 +369,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -484,6 +500,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { (*p)->root_worker = nullptr; (*p)->event_cursor = 0; (*p)->event_count = 0; + (*p)->fd_cache_size = 0; + (*p)->fd_cache_counter = 0; return GRPC_ERROR_NONE; } @@ -524,7 +542,36 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { grpc_error* error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; + gpr_mu_lock(&p->mu); + p->fd_cache_counter++; + // Handle the case of overflow for our cache counter by + // reseting the recency-counter on all cache objects + if (p->fd_cache_counter == 0) { + for (int i = 0; i < p->fd_cache_size; i++) { + p->fd_cache[i].last_used = 0; + } + } + int lru_idx = 0; + for (int i = 0; i < p->fd_cache_size; i++) { + if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { + GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); + p->fd_cache[i].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); + return GRPC_ERROR_NONE; + } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { + lru_idx = i; + } + } + // Add to cache + if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { + lru_idx = p->fd_cache_size; + p->fd_cache_size++; + } + p->fd_cache[lru_idx].fd = fd->fd; + p->fd_cache[lru_idx].salt = fd->salt; + p->fd_cache[lru_idx].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -533,6 +580,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; + GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { case EEXIST: |