aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollex_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc112
1 files changed, 84 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 44d8cf2b1e..12f23ea1d6 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -63,6 +63,7 @@
// a keepalive ping timeout issue. We may want to revert https://github
// .com/grpc/grpc/pull/14943 once we figure out the root cause.
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
+#define MAX_PROBE_EPOLL_FDS 32
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -75,6 +76,12 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable;
+typedef struct cached_fd {
+ intptr_t salt;
+ int fd;
+ uint64_t last_used;
+} cached_fd;
+
/// A pollable is something that can be polled: it has an epoll set to poll on,
/// and a wakeup fd for kicks
/// There are three broad types:
@@ -103,6 +110,11 @@ struct pollable {
int event_cursor;
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ // Maintain a LRU-eviction cache of fds in this pollable
+ cached_fd fd_cache[MAX_PROBE_EPOLL_FDS];
+ int fd_cache_size;
+ uint64_t fd_cache_counter;
};
static const char* pollable_type_string(pollable_type t) {
@@ -145,8 +157,11 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations
*/
+static gpr_atm g_fd_salt;
+
struct grpc_fd {
int fd;
+ intptr_t salt;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
bits 1-n : refcount
@@ -354,6 +369,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
+ new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
@@ -447,9 +463,13 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
if (epfd == -1) {
return GRPC_OS_ERROR(errno, "epoll_create1");
}
+ GRPC_FD_TRACE("Pollable_create: created epfd: %d (type: %d)", epfd, type);
*p = static_cast<pollable*>(gpr_malloc(sizeof(**p)));
grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup);
if (err != GRPC_ERROR_NONE) {
+ GRPC_FD_TRACE(
+ "Pollable_create: closed epfd: %d (type: %d). wakeupfd_init error",
+ epfd, type);
close(epfd);
gpr_free(*p);
*p = nullptr;
@@ -460,6 +480,9 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
ev.data.ptr = (void*)(1 | (intptr_t) & (*p)->wakeup);
if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
err = GRPC_OS_ERROR(errno, "epoll_ctl");
+ GRPC_FD_TRACE(
+ "Pollable_create: closed epfd: %d (type: %d). epoll_ctl error", epfd,
+ type);
close(epfd);
grpc_wakeup_fd_destroy(&(*p)->wakeup);
gpr_free(*p);
@@ -477,6 +500,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
(*p)->root_worker = nullptr;
(*p)->event_cursor = 0;
(*p)->event_count = 0;
+ (*p)->fd_cache_size = 0;
+ (*p)->fd_cache_counter = 0;
return GRPC_ERROR_NONE;
}
@@ -506,6 +531,7 @@ static void pollable_unref(pollable* p, int line, const char* reason) {
}
#endif
if (p != nullptr && gpr_unref(&p->refs)) {
+ GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd);
close(p->epfd);
grpc_wakeup_fd_destroy(&p->wakeup);
gpr_free(p);
@@ -516,15 +542,45 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollable_add_fd";
const int epfd = p->epfd;
+ gpr_mu_lock(&p->mu);
+ p->fd_cache_counter++;
+ // Handle the case of overflow for our cache counter by
+ // reseting the recency-counter on all cache objects
+ if (p->fd_cache_counter == 0) {
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ p->fd_cache[i].last_used = 0;
+ }
+ }
+ int lru_idx = 0;
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) {
+ GRPC_STATS_INC_POLLSET_FD_CACHE_HITS();
+ p->fd_cache[i].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
+ return GRPC_ERROR_NONE;
+ } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) {
+ lru_idx = i;
+ }
+ }
+ // Add to cache
+ if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) {
+ lru_idx = p->fd_cache_size;
+ p->fd_cache_size++;
+ }
+ p->fd_cache[lru_idx].fd = fd->fd;
+ p->fd_cache[lru_idx].salt = fd->salt;
+ p->fd_cache[lru_idx].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
+ gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
struct epoll_event ev_fd;
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
ev_fd.data.ptr = fd;
+ GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
case EEXIST:
@@ -560,7 +616,7 @@ static void pollset_global_shutdown(void) {
/* pollset->mu must be held while calling this function */
static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
"rw=%p (target:NULL) cpsc=%d (target:0)",
pollset, pollset->active_pollable, pollset->shutdown_closure,
@@ -585,14 +641,14 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
GPR_ASSERT(specific_worker != nullptr);
if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
+ gpr_log(GPR_INFO, "PS:%p kicked_specific_but_already_kicked", p);
}
GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
return GRPC_ERROR_NONE;
}
if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
+ gpr_log(GPR_INFO, "PS:%p kicked_specific_but_awake", p);
}
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
specific_worker->kicked = true;
@@ -601,7 +657,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
if (specific_worker == p->root_worker) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
+ gpr_log(GPR_INFO, "PS:%p kicked_specific_via_wakeup_fd", p);
}
specific_worker->kicked = true;
grpc_error* error = grpc_wakeup_fd_wakeup(&p->wakeup);
@@ -610,7 +666,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
if (specific_worker->initialized_cv) {
GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
+ gpr_log(GPR_INFO, "PS:%p kicked_specific_via_cv", p);
}
specific_worker->kicked = true;
gpr_cv_signal(&specific_worker->cv);
@@ -626,7 +682,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
GPR_TIMER_SCOPE("pollset_kick", 0);
GRPC_STATS_INC_POLLSET_KICK();
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
pollset, specific_worker,
(void*)gpr_tls_get(&g_current_thread_pollset),
@@ -636,7 +692,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
if (pollset->root_worker == nullptr) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset);
+ gpr_log(GPR_INFO, "PS:%p kicked_any_without_poller", pollset);
}
GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
pollset->kicked_without_poller = true;
@@ -662,7 +718,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
}
} else {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset);
+ gpr_log(GPR_INFO, "PS:%p kicked_any_but_awake", pollset);
}
GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
return GRPC_ERROR_NONE;
@@ -784,7 +840,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
void* data_ptr = ev->data.ptr;
if (1 & (intptr_t)data_ptr) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
+ gpr_log(GPR_INFO, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
}
append_error(&error,
grpc_wakeup_fd_consume_wakeup(
@@ -797,7 +853,7 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset,
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"PS:%p got fd %p: cancel=%d read=%d "
"write=%d",
pollset, fd, cancel, read_ev, write_ev);
@@ -827,7 +883,7 @@ static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) {
if (grpc_polling_trace.enabled()) {
char* desc = pollable_desc(p);
- gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
+ gpr_log(GPR_INFO, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
gpr_free(desc);
}
@@ -846,7 +902,7 @@ static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) {
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r);
+ gpr_log(GPR_INFO, "POLLABLE:%p got %d events", p, r);
}
p->event_cursor = 0;
@@ -917,7 +973,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
gpr_mu_unlock(&pollset->mu);
if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
- gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
+ gpr_log(GPR_INFO, "PS:%p wait %p w=%p for %dms", pollset,
worker->pollable_obj, worker,
poll_deadline_to_millis_timeout(deadline));
}
@@ -925,19 +981,19 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
+ gpr_log(GPR_INFO, "PS:%p timeout_wait %p w=%p", pollset,
worker->pollable_obj, worker);
}
do_poll = false;
} else if (worker->kicked) {
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset,
+ gpr_log(GPR_INFO, "PS:%p wakeup %p w=%p", pollset,
worker->pollable_obj, worker);
}
do_poll = false;
} else if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
- gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset,
+ gpr_log(GPR_INFO, "PS:%p spurious_wakeup %p w=%p", pollset,
worker->pollable_obj, worker);
}
}
@@ -1009,8 +1065,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
WORKER_PTR->originator = gettid();
#endif
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR
+ gpr_log(GPR_INFO,
+ "PS:%p work hdl=%p worker=%p now=%" PRId64 " deadline=%" PRId64
" kwp=%d pollable=%p",
pollset, worker_hdl, WORKER_PTR, grpc_core::ExecCtx::Get()->Now(),
deadline, pollset->kicked_without_poller, pollset->active_pollable);
@@ -1050,7 +1106,7 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked(
static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd";
grpc_error* error = GRPC_ERROR_NONE;
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"PS:%p add fd %p (%d); transition pollable from empty to fd",
pollset, fd, fd->fd);
}
@@ -1067,7 +1123,7 @@ static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked(
grpc_error* error = GRPC_ERROR_NONE;
if (grpc_polling_trace.enabled()) {
gpr_log(
- GPR_DEBUG,
+ GPR_INFO,
"PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1,
pollset->active_pollable->owner_fd);
@@ -1137,7 +1193,7 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
/* Any workers currently polling on this pollset must now be woked up so
* that they can pick up the new active_pollable */
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
+ gpr_log(GPR_INFO,
"PS:%p active pollable transition from empty to multi",
pollset);
}
@@ -1224,7 +1280,7 @@ static void pollset_set_unref(grpc_pollset_set* pss) {
static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
GPR_TIMER_SCOPE("pollset_set_add_fd", 0);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
+ gpr_log(GPR_INFO, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
}
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_fd";
@@ -1248,7 +1304,7 @@ static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
GPR_TIMER_SCOPE("pollset_set_del_fd", 0);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
+ gpr_log(GPR_INFO, "PSS:%p: del fd %p", pss, fd);
}
pss = pss_lock_adam(pss);
size_t i;
@@ -1269,7 +1325,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
GPR_TIMER_SCOPE("pollset_set_del_pollset", 0);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
+ gpr_log(GPR_INFO, "PSS:%p: del pollset %p", pss, ps);
}
pss = pss_lock_adam(pss);
size_t i;
@@ -1321,7 +1377,7 @@ static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count,
static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
GPR_TIMER_SCOPE("pollset_set_add_pollset", 0);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
+ gpr_log(GPR_INFO, "PSS:%p: add pollset %p", pss, ps);
}
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_pollset";
@@ -1358,7 +1414,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
grpc_pollset_set* b) {
GPR_TIMER_SCOPE("pollset_set_add_pollset_set", 0);
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
+ gpr_log(GPR_INFO, "PSS: merge (%p, %p)", a, b);
}
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollset_set_add_fd";
@@ -1392,7 +1448,7 @@ static void pollset_set_add_pollset_set(grpc_pollset_set* a,
GPR_SWAP(grpc_pollset_set*, a, b);
}
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
+ gpr_log(GPR_INFO, "PSS: parent %p to %p", b, a);
}
gpr_ref(&a->refs);
b->parent = a;