diff options
author | kpayson64 <kpayson@google.com> | 2018-03-08 15:47:40 -0800 |
---|---|---|
committer | kpayson64 <kpayson@google.com> | 2018-04-30 17:40:59 -0700 |
commit | 38ab21ee0996a54c682488bcf43ecf5ba0f7f24a (patch) | |
tree | c65531bc89dd6916896c3dee0de1c6aab17819f1 /src/core/lib | |
parent | 8b875ac9413978370c4eafa2e2fb6a3b2f054297 (diff) |
Add exec_ctx check to fork handlers
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/gpr/fork.cc | 185 | ||||
-rw-r--r-- | src/core/lib/gpr/fork.h | 57 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd.h | 3 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_posix.cc | 55 | ||||
-rw-r--r-- | src/core/lib/gprpp/thd_windows.cc | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 12 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 44 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 6 |
8 files changed, 269 insertions, 100 deletions
diff --git a/src/core/lib/gpr/fork.cc b/src/core/lib/gpr/fork.cc index 812522b058..ec25848bd0 100644 --- a/src/core/lib/gpr/fork.cc +++ b/src/core/lib/gpr/fork.cc @@ -23,6 +23,8 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/useful.h" @@ -32,14 +34,120 @@ * AROUND VERY SPECIFIC USE CASES. */ -static int override_fork_support_enabled = -1; -static int fork_support_enabled; +// The exec_ctx_count has 2 modes, blocked and unblocked. +// When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates +// 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs... -void grpc_fork_support_init() { +// When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx +// creation can only be blocked if there is exactly 1 outstanding ExecCtx, +// meaning that BLOCKED and UNBLOCKED counts partition the integers +#define UNBLOCKED(n) (n + 2) +#define BLOCKED(n) (n) + +class ExecCtxState { + public: + ExecCtxState() : fork_complete_(true) { + gpr_mu_init(&mu_); + gpr_cv_init(&cv_); + gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); + } + + void IncExecCtxCount() { + intptr_t count = static_cast<intptr_t>( + gpr_atm_no_barrier_load(&count_)); + while (true) { + if (count <= BLOCKED(1)) { + // This only occurs if we are trying to fork. Wait until the fork() + // operation completes before allowing new ExecCtxs. + gpr_mu_lock(&mu_); + if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) { + while (!fork_complete_) { + gpr_cv_wait(&cv_, &mu_, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + } + gpr_mu_unlock(&mu_); + } else if (gpr_atm_no_barrier_cas(&count_, count, + count + 1)) { + break; + } + count = gpr_atm_no_barrier_load(&count_); + } + } + + void DecExecCtxCount() { + gpr_atm_no_barrier_fetch_add(&count_, -1); + } + + bool BlockExecCtx() { + // Assumes there is an active ExecCtx when this function is called + if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), + BLOCKED(1))) { + fork_complete_ = false; + return true; + } + return false; + } + + void AllowExecCtx() { + gpr_mu_lock(&mu_); + gpr_atm_no_barrier_store(&count_, UNBLOCKED(0)); + fork_complete_ = true; + gpr_cv_broadcast(&cv_); + gpr_mu_unlock(&g_mu); + } + + void ~ExecCtxState() { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&cv_); + } +} + +class ThreadState { + public: + ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) { + gpr_mu_init(&mu_); + gpr_cv_init(&cv_); + } + + void IncThreadCount() { + gpr_mu_lock(&mu_); + count_++; + gpr_mu_unlock(&mu_); + } + + void DecThreadCount() { + gpr_mu_lock(&mu_); + count_--; + if (awaiting_threads_ && count_ == 0) { + threads_done = true; + gpr_cv_signal(&cv_); + } + gpr_mu_unlock(&mu_); + } + void AwaitThreads() { + gpr_mu_lock(&mu_); + awaiting_threads_ = true; + threads_done_ = (count_ == 0); + while (!threads_done_) { + gpr_cv_wait(&cv_, &mu_, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + awaiting_threads_ = true; + gpr_mu_unlock(&mu_); + } + + ~ThreadState() { + gpr_mu_destroy(&mu_); + gpr_cv_destroy(&cv_); + } +} + +static void Fork::GlobalInit() { #ifdef GRPC_ENABLE_FORK_SUPPORT - fork_support_enabled = 1; + bool supportEnabled_ = true; #else - fork_support_enabled = 0; + bool supportEnabled_ = false; #endif bool env_var_set = false; char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); @@ -50,7 +158,7 @@ void grpc_fork_support_init() { "False", "FALSE", "0"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == strcmp(env, truthy[i])) { - fork_support_enabled = 1; + supportEnabled_ = true; env_var_set = true; break; } @@ -58,7 +166,7 @@ void grpc_fork_support_init() { if (!env_var_set) { for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { if (0 == strcmp(env, falsey[i])) { - fork_support_enabled = 0; + supportEnabled_ = false; env_var_set = true; break; } @@ -66,13 +174,66 @@ void grpc_fork_support_init() { } gpr_free(env); } - if (override_fork_support_enabled != -1) { - fork_support_enabled = override_fork_support_enabled; + if (overrideEnabled_ != -1) { + supportEnabled_ = (overrideEnabled_ == 1); + } + if (supportEnabled_) { + execCtxState_ = grpc_core::New<ExecCtxState>(); + threadState_ = grpc_core::New<ThreadState>(); } } -int grpc_fork_support_enabled() { return fork_support_enabled; } + static void Fork::GlobalShutdown() { + if (supportEnabled_) { + grpc_core::Delete(execCtxState_); + grpc_core::Delete(threadState_); + } + } + + static bool Fork::Enabled() { + return supportEnabled_; + } + + // Testing Only + static void Fork::Enable(bool enable) { + overrideEnabled_ = enable ? 1 : 0; + } + + static void Fork::IncExecCtxCount() { + if(supportEnabled_) { + execCtxState->IncExecCtxCount(); + } + } + + static void Fork::DecExecCtxCount() { + if(supportEnabled_) { + execCtxState->DecExecCtxCount(); + } + } + + static bool Fork::BlockExecCtx() { + if(supportEnabled_) { + return execCtxState->BlockExecCtx(); + } + return false; + } + + static void Fork::AllowExecCtx() { + execCtxState->AllowExecCtx(); + } + + static void Fork::IncThreadCount() { + threadState->IncThreadCount(); + } + + static void Fork::DecThreadCount() { + threadState_->DecThreadCount(); + } + static void Fork::AwaitThreads() { + threadState_->AwaitThreads(); + } -void grpc_enable_fork_support(int enable) { - override_fork_support_enabled = enable; +private: + ExecCtxState* execCtxState_; + ThreadState* threadState_; } diff --git a/src/core/lib/gpr/fork.h b/src/core/lib/gpr/fork.h index 94c61bb836..bf5acc3292 100644 --- a/src/core/lib/gpr/fork.h +++ b/src/core/lib/gpr/fork.h @@ -24,12 +24,59 @@ * AROUND VERY SPECIFIC USE CASES. */ -void grpc_fork_support_init(void); +namespace grpc_core { -int grpc_fork_support_enabled(void); +namespace { + class ExecCtxState; + class ThreadState; +} -// Test only: Must be called before grpc_init(), and overrides -// environment variables/compile flags -void grpc_enable_fork_support(int enable); +namespace internal { + +class ForkSupport { + public: + static void GlobalInit(); + static void GlobalShutdown(); + + // Returns true if fork suppport is enabled, false otherwise + static bool Enabled(); + + // Increment the count of active ExecCtxs. + // Will block until a pending fork is complete if one is in progress. + void IncExecCtxCount(); + + // Decrement the count of active ExecCtxs + void DecExecCtxCount(); + + // Check if there is a single active ExecCtx + // (the one used to invoke this function). If there are more, + // return false. Otherwise, return true and block creation of + // more ExecCtx s until AlloWExecCtx() is called + // + bool BlockExecCtx(); + void AllowExecCtx(); + + // Increment the count of active threads. + void IncThreadCount(); + + // Decrement the count of active threads. + void DecThreadCount(); + + // Await all core threads to be joined. + void AwaitThreads(); + + // Test only: overrides environment variables/compile flags + // Must be called before grpc_init() + void Enable(bool enable); + + private: + static ExecCtxState* execCtxState_ = nullptr; + static ThreadState* threadState_ = nullptr; + static bool supportEnabled_ = false; + static int overrideEnabled_ = -1; +} + +} // namespace internal +} // namespace grpc_core #endif /* GRPC_CORE_LIB_GPR_FORK_H */ diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index 05c7ded45f..caf0652c1a 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -111,9 +111,6 @@ class Thread { } }; - static void Init(); - static bool AwaitAll(gpr_timespec deadline); - private: Thread(const Thread&) = delete; Thread& operator=(const Thread&) = delete; diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc index 2f6c2edcae..1a20a0bbc3 100644 --- a/src/core/lib/gprpp/thd_posix.cc +++ b/src/core/lib/gprpp/thd_posix.cc @@ -38,11 +38,6 @@ namespace grpc_core { namespace { -gpr_mu g_mu; -gpr_cv g_cv; -int g_thread_count; -int g_awaiting_threads; - class ThreadInternalsPosix; struct thd_arg { ThreadInternalsPosix* thread; @@ -68,7 +63,7 @@ class ThreadInternalsPosix info->body = thd_body; info->arg = arg; info->name = thd_name; - inc_thd_count(); + grpc_fork_inc_thd_count(); GPR_ASSERT(pthread_attr_init(&attr) == 0); GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) == @@ -103,7 +98,7 @@ class ThreadInternalsPosix gpr_mu_unlock(&arg.thread->mu_); (*arg.body)(arg.arg); - dec_thd_count(); + grpc_fork_dec_thd_count(); return nullptr; }, info) == 0); @@ -113,7 +108,7 @@ class ThreadInternalsPosix if (!success) { /* don't use gpr_free, as this was allocated using malloc (see above) */ free(info); - dec_thd_count(); + grpc_fork_dec_thd_count(); } }; @@ -132,29 +127,6 @@ class ThreadInternalsPosix void Join() override { pthread_join(pthread_id_, nullptr); } private: - /***************************************** - * Only used when fork support is enabled - */ - - static void inc_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count++; - gpr_mu_unlock(&g_mu); - } - } - - static void dec_thd_count() { - if (grpc_fork_support_enabled()) { - gpr_mu_lock(&g_mu); - g_thread_count--; - if (g_awaiting_threads && g_thread_count == 0) { - gpr_cv_signal(&g_cv); - } - gpr_mu_unlock(&g_mu); - } - } - gpr_mu mu_; gpr_cv ready_; bool started_; @@ -180,27 +152,6 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, *success = outcome; } } - -void Thread::Init() { - gpr_mu_init(&g_mu); - gpr_cv_init(&g_cv); - g_thread_count = 0; - g_awaiting_threads = 0; -} - -bool Thread::AwaitAll(gpr_timespec deadline) { - gpr_mu_lock(&g_mu); - g_awaiting_threads = 1; - int res = 0; - while ((g_thread_count > 0) && - (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) { - res = gpr_cv_wait(&g_cv, &g_mu, deadline); - } - g_awaiting_threads = 0; - gpr_mu_unlock(&g_mu); - return res == 0; -} - } // namespace grpc_core // The following is in the external namespace as it is exposed as C89 API diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc index 59ea02f3d2..71584fd358 100644 --- a/src/core/lib/gprpp/thd_windows.cc +++ b/src/core/lib/gprpp/thd_windows.cc @@ -131,13 +131,6 @@ class ThreadInternalsWindows namespace grpc_core { -void Thread::Init() {} - -bool Thread::AwaitAll(gpr_timespec deadline) { - // TODO: Consider adding this if needed - return false; -} - Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, bool* success) { bool outcome = false; diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index de97164f02..b5b59621f9 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -25,6 +25,7 @@ #include <grpc/support/cpu.h> #include <grpc/support/log.h> +#include "src/core/lib/gpr/fork.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/iomgr/closure.h" @@ -76,16 +77,23 @@ class ExecCtx { public: /** Default Constructor */ - ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { Set(this); } + ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { + grpc_fork_inc_exec_ctx_count(); + Set(this); + } /** Parameterised Constructor */ - ExecCtx(uintptr_t fl) : flags_(fl) { Set(this); } + ExecCtx(uintptr_t fl) : flags_(fl) { + grpc_fork_inc_exec_ctx_count(); + Set(this); + } /** Destructor */ virtual ~ExecCtx() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; Flush(); Set(last_exec_ctx_); + grpc_fork_dec_exec_ctx_count(); } /** Disallow copy and assignment operators */ diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index f8645ab157..dd0f9f612f 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -41,47 +41,59 @@ * AROUND VERY SPECIFIC USE CASES. */ +namespace { +bool skipped_handler = true; +bool registered_handlers = false; +} // namespace + void grpc_prefork() { + grpc_core::ExecCtx exec_ctx; + skipped_handler = true; + if (!grpc_is_initialized()) { + return; + } if (!grpc_fork_support_enabled()) { gpr_log(GPR_ERROR, "Fork support not enabled; try running with the " "environment variable GRPC_ENABLE_FORK_SUPPORT=1"); return; } - if (grpc_is_initialized()) { - grpc_core::ExecCtx exec_ctx; - grpc_timer_manager_set_threading(false); - grpc_executor_set_threading(false); - grpc_core::ExecCtx::Get()->Flush(); - if (!grpc_core::Thread::AwaitAll( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(3, GPR_TIMESPAN)))) { - gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!"); - } + if (!grpc_fork_block_exec_ctx()) { + gpr_log(GPR_INFO, + "Other threads are currently calling into gRPC, skipping fork() " + "handlers"); + return; } + grpc_timer_manager_set_threading(false); + grpc_executor_set_threading(false); + grpc_core::ExecCtx::Get()->Flush(); + grpc_fork_await_thds(); + skipped_handler = false; } void grpc_postfork_parent() { - if (grpc_is_initialized()) { - grpc_timer_manager_set_threading(true); + if (!skipped_handler) { + grpc_fork_allow_exec_ctx(); grpc_core::ExecCtx exec_ctx; + grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); } } void grpc_postfork_child() { - if (grpc_is_initialized()) { - grpc_timer_manager_set_threading(true); + if (!skipped_handler) { + grpc_fork_allow_exec_ctx(); grpc_core::ExecCtx exec_ctx; + grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); - grpc_core::ExecCtx::Get()->Flush(); } } void grpc_fork_handlers_auto_register() { - if (grpc_fork_support_enabled()) { + if (grpc_fork_support_enabled() & !registered_handlers) { #ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child); + registered_handlers = true; #endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK } } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index ac9f9e6066..285e6f7ddc 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -64,12 +64,10 @@ static int g_initializations; static void do_basic_init(void) { gpr_log_verbosity_init(); - grpc_fork_support_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); grpc_cq_global_init(); g_initializations = 0; - grpc_fork_handlers_auto_register(); } static bool append_filter(grpc_channel_stack_builder* builder, void* arg) { @@ -122,8 +120,9 @@ void grpc_init(void) { gpr_mu_lock(&g_init_mu); if (++g_initializations == 1) { + grpc_fork_support_init(); + grpc_fork_handlers_auto_register(); gpr_time_init(); - grpc_core::Thread::Init(); grpc_stats_init(); grpc_slice_intern_init(); grpc_mdctx_global_init(); @@ -177,6 +176,7 @@ void grpc_shutdown(void) { grpc_handshaker_factory_registry_shutdown(); grpc_slice_intern_shutdown(); grpc_stats_shutdown(); + grpc_fork_support_destroy(); } grpc_core::ExecCtx::GlobalShutdown(); } |