diff options
-rw-r--r-- | include/grpc/support/sync.h | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 85 |
2 files changed, 60 insertions, 43 deletions
diff --git a/include/grpc/support/sync.h b/include/grpc/support/sync.h index fe8a59a5d6..3e3f1925ce 100644 --- a/include/grpc/support/sync.h +++ b/include/grpc/support/sync.h @@ -274,7 +274,23 @@ GPRAPI intptr_t gpr_stats_read(const gpr_stats_counter *c); #endif /* 0 */ #ifdef __cplusplus -} +} // extern "C" + +namespace grpc_core { + +class mu_guard { +public: + mu_guard(gpr_mu *mu) : mu_(mu) { gpr_mu_lock(mu); } + ~mu_guard() { gpr_mu_unlock(mu_); } + + mu_guard(const mu_guard&) = delete; + mu_guard& operator=(const mu_guard&) = delete; + +private: + gpr_mu* const mu_; +}; + +} // namespace grpc_core #endif #endif /* GRPC_SUPPORT_SYNC_H */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 8ec0947946..bc2e665f06 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -24,6 +24,7 @@ #include "src/core/lib/iomgr/ev_epollex_linux.h" #include <assert.h> +#include <sys/syscall.h> #include <errno.h> #include <limits.h> #include <poll.h> @@ -165,6 +166,7 @@ typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; struct grpc_pollset_worker { bool kicked; bool initialized_cv; + pid_t originator; gpr_cv cv; grpc_pollset *pollset; pollable *pollable_obj; @@ -181,6 +183,7 @@ struct grpc_pollset { bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; + int containing_pollset_set_count; int event_cursor; int event_count; @@ -542,39 +545,42 @@ static void pollset_global_shutdown(void) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { + if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } -/* both pollset->active_pollable->mu, pollset->mu must be held before calling - * this function */ +/* pollset->mu must be held before calling this function, pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be held */ static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { pollable *p = specific_worker->pollable_obj; + grpc_core::mu_guard lock(&p->mu); GPR_ASSERT(specific_worker != NULL); if (specific_worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } return GRPC_ERROR_NONE; - } else if (gpr_tls_get(&g_current_thread_worker) == + } + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } specific_worker->kicked = true; return GRPC_ERROR_NONE; - } else if (specific_worker == p->root_worker) { + } +if (specific_worker == p->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } specific_worker->kicked = true; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } else { - GPR_ASSERT(specific_worker->initialized_cv); + grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup); + return error; + } +if (specific_worker->initialized_cv) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } @@ -582,13 +588,12 @@ static grpc_error *pollset_kick_one(grpc_exec_ctx *exec_ctx, gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } + // we can get here during end_worker after removing specific_worker from the pollable list but before removing it from the pollset list + return GRPC_ERROR_NONE; } -/* both pollset->active_pollable->mu, pollset->mu must be held before calling - * this function */ -static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { +static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", @@ -619,21 +624,10 @@ static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, } } -static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { - pollable *p = pollset->active_pollable; - gpr_mu_lock(&p->mu); - grpc_error *error = pollset_kick_inner(exec_ctx, pollset, specific_worker); - gpr_mu_unlock(&p->mu); - return error; -} - static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable *p = pollset->active_pollable; grpc_error *error = GRPC_ERROR_NONE; const char *err_desc = "pollset_kick_all"; - gpr_mu_lock(&p->mu); grpc_pollset_worker *w = pollset->root_worker; if (w != NULL) { do { @@ -641,7 +635,6 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, w = w->links[PWLINK_POLLSET].next; } while (w != pollset->root_worker); } - gpr_mu_unlock(&p->mu); return error; } @@ -855,6 +848,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, PWLINK_POLLABLE)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); + gpr_mu_unlock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace) && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, @@ -882,11 +876,13 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } grpc_exec_ctx_invalidate_now(exec_ctx); + } else { + gpr_mu_unlock(&pollset->mu); } gpr_mu_unlock(&worker->pollable_obj->mu); - return do_poll && pollset->shutdown_closure == NULL && - pollset->active_pollable == worker->pollable_obj; + return do_poll; +// && pollset->shutdown_closure == NULL && pollset->active_pollable == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -899,18 +895,19 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_ASSERT(new_root->initialized_cv); gpr_cv_signal(&new_root->cv); } + gpr_mu_unlock(&worker->pollable_obj->mu); + POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); + gpr_mu_lock(&pollset->mu); + if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == WRR_EMPTIED) { + pollset_maybe_finish_shutdown(exec_ctx, pollset); + } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET)) { - gpr_mu_unlock(&worker->pollable_obj->mu); - pollset_maybe_finish_shutdown(exec_ctx, pollset); - } else { - gpr_mu_unlock(&worker->pollable_obj->mu); - } - POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); } +static long gettid(void) { return syscall(__NR_gettid); } + /* pollset->po.mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and @@ -926,6 +923,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker worker; #define WORKER_PTR (&worker) #endif + WORKER_PTR->originator = gettid(); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR " kwp=%d pollable=%p", @@ -940,8 +938,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); - GPR_ASSERT(!pollset->shutdown_closure); - gpr_mu_unlock(&pollset->mu); if (pollset->event_cursor == pollset->event_count) { append_error(&error, pollset_epoll(exec_ctx, pollset, WORKER_PTR->pollable_obj, deadline), @@ -950,7 +946,6 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); } @@ -1044,11 +1039,10 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, return error; } -static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx, +static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollable **pollable_obj) { grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&pollset->mu); pollable *po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); switch (pollset->active_pollable->type) { @@ -1079,7 +1073,6 @@ static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx, *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } - gpr_mu_unlock(&pollset->mu); return error; } @@ -1191,6 +1184,11 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, } pss->pollset_count--; gpr_mu_unlock(&pss->mu); + gpr_mu_lock(&ps->mu); + if (0 == --ps->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, ps); + } + gpr_mu_unlock(&ps->mu); } // add all fds to pollables, and output a new array of unorphaned out_fds @@ -1224,11 +1222,15 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_set_add_pollset"; pollable *pollable_obj = NULL; +gpr_mu_lock(&ps->mu); if (!GRPC_LOG_IF_ERROR( - err_desc, pollset_as_multipollable(exec_ctx, ps, &pollable_obj))) { + err_desc, pollset_as_multipollable_locked(exec_ctx, ps, &pollable_obj))) { GPR_ASSERT(pollable_obj == NULL); +gpr_mu_unlock(&ps->mu); return; } +ps->containing_pollset_set_count++; +gpr_mu_unlock(&ps->mu); pss = pss_lock_adam(pss); size_t initial_fd_count = pss->fd_count; pss->fd_count = 0; @@ -1311,7 +1313,6 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, } memcpy(a->pollsets + a->pollset_count, b->pollsets, b->pollset_count * sizeof(*b->pollsets)); - a->fd_count += b->fd_count; a->pollset_count += b->pollset_count; gpr_free(b->fds); gpr_free(b->pollsets); |