diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 37 |
1 files changed, 36 insertions, 1 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 416e8384b4..178ebd8977 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -199,6 +199,7 @@ struct grpc_pollset { pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; + bool already_shutdown; grpc_pollset_worker* root_worker; int containing_pollset_set_count; }; @@ -560,8 +561,10 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { } if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr && pollset->containing_pollset_set_count == 0) { + GPR_TIMER_MARK("pollset_finish_shutdown", 0); GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = nullptr; + pollset->already_shutdown = true; } } @@ -569,6 +572,7 @@ static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) { * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be * held */ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { + GPR_TIMER_SCOPE("kick_one_worker", 0); pollable* p = specific_worker->pollable_obj; grpc_core::mu_guard lock(&p->mu); GPR_ASSERT(specific_worker != nullptr); @@ -612,6 +616,7 @@ static grpc_error* kick_one_worker(grpc_pollset_worker* specific_worker) { static grpc_error* pollset_kick(grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { + GPR_TIMER_SCOPE("pollset_kick", 0); GRPC_STATS_INC_POLLSET_KICK(); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, @@ -661,6 +666,7 @@ static grpc_error* pollset_kick(grpc_pollset* pollset, } static grpc_error* pollset_kick_all(grpc_pollset* pollset) { + GPR_TIMER_SCOPE("pollset_kick_all", 0); grpc_error* error = GRPC_ERROR_NONE; const char* err_desc = "pollset_kick_all"; grpc_pollset_worker* w = pollset->root_worker; @@ -677,6 +683,11 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); + pollset->kicked_without_poller = false; + pollset->shutdown_closure = nullptr; + pollset->already_shutdown = false; + pollset->root_worker = nullptr; + pollset->containing_pollset_set_count = 0; *mu = &pollset->mu; } @@ -733,6 +744,7 @@ static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { + GPR_TIMER_SCOPE("pollset_shutdown", 0); GPR_ASSERT(pollset->shutdown_closure == nullptr); pollset->shutdown_closure = closure; GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); @@ -741,6 +753,7 @@ static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { static grpc_error* pollable_process_events(grpc_pollset* pollset, pollable* pollable_obj, bool drain) { + GPR_TIMER_SCOPE("pollable_process_events", 0); static const char* err_desc = "pollset_process_events"; grpc_error* error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && @@ -787,6 +800,7 @@ static void pollset_destroy(grpc_pollset* pollset) { } static grpc_error* pollable_epoll(pollable* p, grpc_millis deadline) { + GPR_TIMER_SCOPE("pollable_epoll", 0); int timeout = poll_deadline_to_millis_timeout(deadline); if (grpc_polling_trace.enabled()) { @@ -862,7 +876,9 @@ static worker_remove_result worker_remove(grpc_pollset_worker** root_worker, static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, grpc_pollset_worker** worker_hdl, grpc_millis deadline) { - bool do_poll = (pollset->shutdown_closure == nullptr); + GPR_TIMER_SCOPE("begin_worker", 0); + bool do_poll = + (pollset->shutdown_closure == nullptr && !pollset->already_shutdown); if (worker_hdl != nullptr) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -913,6 +929,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, grpc_pollset_worker** worker_hdl) { + GPR_TIMER_SCOPE("end_worker", 0); gpr_mu_lock(&pollset->mu); gpr_mu_lock(&worker->pollable_obj->mu); switch (worker_remove(&worker->pollable_obj->root_worker, worker, @@ -955,6 +972,7 @@ static long gettid(void) { return syscall(__NR_gettid); } static grpc_error* pollset_work(grpc_pollset* pollset, grpc_pollset_worker** worker_hdl, grpc_millis deadline) { + GPR_TIMER_SCOPE("pollset_work", 0); #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP grpc_pollset_worker* worker = (grpc_pollset_worker*)gpr_malloc(sizeof(*worker)); @@ -1092,6 +1110,16 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, case PO_EMPTY: POLLABLE_UNREF(pollset->active_pollable, "pollset"); error = pollable_create(PO_MULTI, &pollset->active_pollable); + /* Any workers currently polling on this pollset must now be woked up so + * that they can pick up the new active_pollable */ + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, + "PS:%p active pollable transition from empty to multi", + pollset); + } + static const char* err_desc = + "pollset_as_multipollable_locked: empty -> multi"; + append_error(&error, pollset_kick_all(pollset), err_desc); break; case PO_FD: gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); @@ -1120,6 +1148,7 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, } static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { + GPR_TIMER_SCOPE("pollset_add_fd", 0); gpr_mu_lock(&pollset->mu); grpc_error* error = pollset_add_fd_locked(pollset, fd); gpr_mu_unlock(&pollset->mu); @@ -1168,6 +1197,7 @@ static void pollset_set_unref(grpc_pollset_set* pss) { } static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) { + GPR_TIMER_SCOPE("pollset_set_add_fd", 0); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); } @@ -1191,6 +1221,7 @@ static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) { } static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) { + GPR_TIMER_SCOPE("pollset_set_del_fd", 0); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); } @@ -1211,6 +1242,7 @@ static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) { } static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) { + GPR_TIMER_SCOPE("pollset_set_del_pollset", 0); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } @@ -1241,6 +1273,7 @@ static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count, size_t pollset_count, const char* err_desc, grpc_fd** out_fds, size_t* out_fd_count) { + GPR_TIMER_SCOPE("add_fds_to_pollsets", 0); grpc_error* error = GRPC_ERROR_NONE; for (size_t i = 0; i < fd_count; i++) { gpr_mu_lock(&fds[i]->orphan_mu); @@ -1261,6 +1294,7 @@ static grpc_error* add_fds_to_pollsets(grpc_fd** fds, size_t fd_count, } static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) { + GPR_TIMER_SCOPE("pollset_set_add_pollset", 0); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } @@ -1297,6 +1331,7 @@ static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) { static void pollset_set_add_pollset_set(grpc_pollset_set* a, grpc_pollset_set* b) { + GPR_TIMER_SCOPE("pollset_set_add_pollset_set", 0); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); } |