aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollex_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc37
1 files changed, 36 insertions, 1 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 416e8384b4..178ebd8977 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -199,6 +199,7 @@ struct grpc_pollset {
pollable* active_pollable;
bool kicked_without_poller;
grpc_closure* shutdown_closure;
+ bool already_shutdown;
grpc_pollset_worker* root_worker;
int containing_pollset_set_count;
};
@@ -560,8 +561,10 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
}
if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
pollset->containing_pollset_set_count == 0) {
+ GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = nullptr;
+ pollset->already_shutdown = true;
}
}
@@ -569,6 +572,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
* pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be
* held */
static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
+ GPR_TIMER_SCOPE("kick_one_worker", 0);
pollable* p = specific_worker->pollable_obj;
grpc_core::mu_guard lock(&p->mu);
GPR_ASSERT(specific_worker != nullptr);
@@ -612,6 +616,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) {
static grpc_error* pollset_kick(grpc_pollset* pollset,
grpc_pollset_worker* specific_worker) {
+ GPR_TIMER_SCOPE("pollset_kick", 0);
GRPC_STATS_INC_POLLSET_KICK();
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG,
@@ -661,6 +666,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset,
}
static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
+ GPR_TIMER_SCOPE("pollset_kick_all", 0);
grpc_error* error = GRPC_ERROR_NONE;
const char* err_desc = "pollset_kick_all";
grpc_pollset_worker* w = pollset->root_worker;
@@ -677,6 +683,11 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
gpr_mu_init(&pollset->mu);
pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
+ pollset->kicked_without_poller = false;
+ pollset->shutdown_closure = nullptr;
+ pollset->already_shutdown = false;
+ pollset->root_worker = nullptr;
+ pollset->containing_pollset_set_count = 0;
*mu = &pollset->mu;
}
@@ -733,6 +744,7 @@ static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) {
/* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
+ GPR_TIMER_SCOPE("pollset_shutdown", 0);
GPR_ASSERT(pollset->shutdown_closure == nullptr);
pollset->shutdown_closure = closure;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
@@ -741,6 +753,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
static grpc_error* pollable_process_events(grpc_pollset* pollset,
pollable* pollable_obj, bool drain) {
+ GPR_TIMER_SCOPE("pollable_process_events", 0);
static const char* err_desc = "pollset_process_events";
grpc_error* error = GRPC_ERROR_NONE;
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
@@ -787,6 +800,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
}
static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) {
+ GPR_TIMER_SCOPE("pollable_epoll", 0);
int timeout = poll_deadline_to_millis_timeout(deadline);
if (grpc_polling_trace.enabled()) {
@@ -862,7 +876,9 @@ static worker_remove_result worker_remove(grpc_pollset_worker** root_worker,
static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
grpc_pollset_worker** worker_hdl,
grpc_millis deadline) {
- bool do_poll = (pollset->shutdown_closure == nullptr);
+ GPR_TIMER_SCOPE("begin_worker", 0);
+ bool do_poll =
+ (pollset->shutdown_closure == nullptr && !pollset->already_shutdown);
if (worker_hdl != nullptr) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
@@ -913,6 +929,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
grpc_pollset_worker** worker_hdl) {
+ GPR_TIMER_SCOPE("end_worker", 0);
gpr_mu_lock(&pollset->mu);
gpr_mu_lock(&worker->pollable_obj->mu);
switch (worker_remove(&worker->pollable_obj->root_worker, worker,
@@ -955,6 +972,7 @@ static long gettid(void) { return syscall(__NR_gettid); }
static grpc_error* pollset_work(grpc_pollset* pollset,
grpc_pollset_worker** worker_hdl,
grpc_millis deadline) {
+ GPR_TIMER_SCOPE("pollset_work", 0);
#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
grpc_pollset_worker* worker =
(grpc_pollset_worker*)gpr_malloc(sizeof(*worker));
@@ -1092,6 +1110,16 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
case PO_EMPTY:
POLLABLE_UNREF(pollset->active_pollable, "pollset");
error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ /* Any workers currently polling on this pollset must now be woked up so
+ * that they can pick up the new active_pollable */
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p active pollable transition from empty to multi",
+ pollset);
+ }
+ static const char* err_desc =
+ "pollset_as_multipollable_locked: empty -> multi";
+ append_error(&error, pollset_kick_all(pollset), err_desc);
break;
case PO_FD:
gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
@@ -1120,6 +1148,7 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset,
}
static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
+ GPR_TIMER_SCOPE("pollset_add_fd", 0);
gpr_mu_lock(&pollset->mu);
grpc_error* error = pollset_add_fd_locked(pollset, fd);
gpr_mu_unlock(&pollset->mu);
@@ -1168,6 +1197,7 @@ static void pollset_set_unref(grpc_pollset_set* pss) {
}
static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
+ GPR_TIMER_SCOPE("pollset_set_add_fd", 0);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
}
@@ -1191,6 +1221,7 @@ static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
}
static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
+ GPR_TIMER_SCOPE("pollset_set_del_fd", 0);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
}
@@ -1211,6 +1242,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
}
static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
+ GPR_TIMER_SCOPE("pollset_set_del_pollset", 0);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
}
@@ -1241,6 +1273,7 @@ static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count,
size_t pollset_count,
const char* err_desc, grpc_fd** out_fds,
size_t* out_fd_count) {
+ GPR_TIMER_SCOPE("add_fds_to_pollsets", 0);
grpc_error* error = GRPC_ERROR_NONE;
for (size_t i = 0; i < fd_count; i++) {
gpr_mu_lock(&fds[i]->orphan_mu);
@@ -1261,6 +1294,7 @@ static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count,
}
static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
+ GPR_TIMER_SCOPE("pollset_set_add_pollset", 0);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
}
@@ -1297,6 +1331,7 @@ static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
static void pollset_set_add_pollset_set(grpc_pollset_set* a,
grpc_pollset_set* b) {
+ GPR_TIMER_SCOPE("pollset_set_add_pollset_set", 0);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
}