aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2018-03-08 15:47:40 -0800
committerGravatar kpayson64 <kpayson@google.com>2018-04-30 17:40:59 -0700
commit38ab21ee0996a54c682488bcf43ecf5ba0f7f24a (patch)
treec65531bc89dd6916896c3dee0de1c6aab17819f1 /src/core
parent8b875ac9413978370c4eafa2e2fb6a3b2f054297 (diff)
Add exec_ctx check to fork handlers
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/gpr/fork.cc185
-rw-r--r--src/core/lib/gpr/fork.h57
-rw-r--r--src/core/lib/gprpp/thd.h3
-rw-r--r--src/core/lib/gprpp/thd_posix.cc55
-rw-r--r--src/core/lib/gprpp/thd_windows.cc7
-rw-r--r--src/core/lib/iomgr/exec_ctx.h12
-rw-r--r--src/core/lib/iomgr/fork_posix.cc44
-rw-r--r--src/core/lib/surface/init.cc6
8 files changed, 269 insertions, 100 deletions
diff --git a/src/core/lib/gpr/fork.cc b/src/core/lib/gpr/fork.cc
index 812522b058..ec25848bd0 100644
--- a/src/core/lib/gpr/fork.cc
+++ b/src/core/lib/gpr/fork.cc
@@ -23,6 +23,8 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/useful.h"
@@ -32,14 +34,120 @@
* AROUND VERY SPECIFIC USE CASES.
*/
-static int override_fork_support_enabled = -1;
-static int fork_support_enabled;
+// The exec_ctx_count has 2 modes, blocked and unblocked.
+// When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
+// 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
-void grpc_fork_support_init() {
+// When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
+// creation can only be blocked if there is exactly 1 outstanding ExecCtx,
+// meaning that BLOCKED and UNBLOCKED counts partition the integers
+#define UNBLOCKED(n) (n + 2)
+#define BLOCKED(n) (n)
+
+class ExecCtxState {
+ public:
+ ExecCtxState() : fork_complete_(true) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
+ }
+
+ void IncExecCtxCount() {
+ intptr_t count = static_cast<intptr_t>(
+ gpr_atm_no_barrier_load(&count_));
+ while (true) {
+ if (count <= BLOCKED(1)) {
+ // This only occurs if we are trying to fork. Wait until the fork()
+ // operation completes before allowing new ExecCtxs.
+ gpr_mu_lock(&mu_);
+ if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
+ while (!fork_complete_) {
+ gpr_cv_wait(&cv_, &mu_,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ }
+ gpr_mu_unlock(&mu_);
+ } else if (gpr_atm_no_barrier_cas(&count_, count,
+ count + 1)) {
+ break;
+ }
+ count = gpr_atm_no_barrier_load(&count_);
+ }
+ }
+
+ void DecExecCtxCount() {
+ gpr_atm_no_barrier_fetch_add(&count_, -1);
+ }
+
+ bool BlockExecCtx() {
+ // Assumes there is an active ExecCtx when this function is called
+ if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1),
+ BLOCKED(1))) {
+ fork_complete_ = false;
+ return true;
+ }
+ return false;
+ }
+
+ void AllowExecCtx() {
+ gpr_mu_lock(&mu_);
+ gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
+ fork_complete_ = true;
+ gpr_cv_broadcast(&cv_);
+ gpr_mu_unlock(&g_mu);
+ }
+
+ void ~ExecCtxState() {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&cv_);
+ }
+}
+
+class ThreadState {
+ public:
+ ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ }
+
+ void IncThreadCount() {
+ gpr_mu_lock(&mu_);
+ count_++;
+ gpr_mu_unlock(&mu_);
+ }
+
+ void DecThreadCount() {
+ gpr_mu_lock(&mu_);
+ count_--;
+ if (awaiting_threads_ && count_ == 0) {
+ threads_done = true;
+ gpr_cv_signal(&cv_);
+ }
+ gpr_mu_unlock(&mu_);
+ }
+ void AwaitThreads() {
+ gpr_mu_lock(&mu_);
+ awaiting_threads_ = true;
+ threads_done_ = (count_ == 0);
+ while (!threads_done_) {
+ gpr_cv_wait(&cv_, &mu_,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ awaiting_threads_ = true;
+ gpr_mu_unlock(&mu_);
+ }
+
+ ~ThreadState() {
+ gpr_mu_destroy(&mu_);
+ gpr_cv_destroy(&cv_);
+ }
+}
+
+static void Fork::GlobalInit() {
#ifdef GRPC_ENABLE_FORK_SUPPORT
- fork_support_enabled = 1;
+ bool supportEnabled_ = true;
#else
- fork_support_enabled = 0;
+ bool supportEnabled_ = false;
#endif
bool env_var_set = false;
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
@@ -50,7 +158,7 @@ void grpc_fork_support_init() {
"False", "FALSE", "0"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) {
- fork_support_enabled = 1;
+ supportEnabled_ = true;
env_var_set = true;
break;
}
@@ -58,7 +166,7 @@ void grpc_fork_support_init() {
if (!env_var_set) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
if (0 == strcmp(env, falsey[i])) {
- fork_support_enabled = 0;
+ supportEnabled_ = false;
env_var_set = true;
break;
}
@@ -66,13 +174,66 @@ void grpc_fork_support_init() {
}
gpr_free(env);
}
- if (override_fork_support_enabled != -1) {
- fork_support_enabled = override_fork_support_enabled;
+ if (overrideEnabled_ != -1) {
+ supportEnabled_ = (overrideEnabled_ == 1);
+ }
+ if (supportEnabled_) {
+ execCtxState_ = grpc_core::New<ExecCtxState>();
+ threadState_ = grpc_core::New<ThreadState>();
}
}
-int grpc_fork_support_enabled() { return fork_support_enabled; }
+ static void Fork::GlobalShutdown() {
+ if (supportEnabled_) {
+ grpc_core::Delete(execCtxState_);
+ grpc_core::Delete(threadState_);
+ }
+ }
+
+ static bool Fork::Enabled() {
+ return supportEnabled_;
+ }
+
+ // Testing Only
+ static void Fork::Enable(bool enable) {
+ overrideEnabled_ = enable ? 1 : 0;
+ }
+
+ static void Fork::IncExecCtxCount() {
+ if(supportEnabled_) {
+ execCtxState->IncExecCtxCount();
+ }
+ }
+
+ static void Fork::DecExecCtxCount() {
+ if(supportEnabled_) {
+ execCtxState->DecExecCtxCount();
+ }
+ }
+
+ static bool Fork::BlockExecCtx() {
+ if(supportEnabled_) {
+ return execCtxState->BlockExecCtx();
+ }
+ return false;
+ }
+
+ static void Fork::AllowExecCtx() {
+ execCtxState->AllowExecCtx();
+ }
+
+ static void Fork::IncThreadCount() {
+ threadState->IncThreadCount();
+ }
+
+ static void Fork::DecThreadCount() {
+ threadState_->DecThreadCount();
+ }
+ static void Fork::AwaitThreads() {
+ threadState_->AwaitThreads();
+ }
-void grpc_enable_fork_support(int enable) {
- override_fork_support_enabled = enable;
+private:
+ ExecCtxState* execCtxState_;
+ ThreadState* threadState_;
}
diff --git a/src/core/lib/gpr/fork.h b/src/core/lib/gpr/fork.h
index 94c61bb836..bf5acc3292 100644
--- a/src/core/lib/gpr/fork.h
+++ b/src/core/lib/gpr/fork.h
@@ -24,12 +24,59 @@
* AROUND VERY SPECIFIC USE CASES.
*/
-void grpc_fork_support_init(void);
+namespace grpc_core {
-int grpc_fork_support_enabled(void);
+namespace {
+ class ExecCtxState;
+ class ThreadState;
+}
-// Test only: Must be called before grpc_init(), and overrides
-// environment variables/compile flags
-void grpc_enable_fork_support(int enable);
+namespace internal {
+
+class ForkSupport {
+ public:
+ static void GlobalInit();
+ static void GlobalShutdown();
+
+ // Returns true if fork suppport is enabled, false otherwise
+ static bool Enabled();
+
+ // Increment the count of active ExecCtxs.
+ // Will block until a pending fork is complete if one is in progress.
+ void IncExecCtxCount();
+
+ // Decrement the count of active ExecCtxs
+ void DecExecCtxCount();
+
+ // Check if there is a single active ExecCtx
+ // (the one used to invoke this function). If there are more,
+ // return false. Otherwise, return true and block creation of
+ // more ExecCtx s until AlloWExecCtx() is called
+ //
+ bool BlockExecCtx();
+ void AllowExecCtx();
+
+ // Increment the count of active threads.
+ void IncThreadCount();
+
+ // Decrement the count of active threads.
+ void DecThreadCount();
+
+ // Await all core threads to be joined.
+ void AwaitThreads();
+
+ // Test only: overrides environment variables/compile flags
+ // Must be called before grpc_init()
+ void Enable(bool enable);
+
+ private:
+ static ExecCtxState* execCtxState_ = nullptr;
+ static ThreadState* threadState_ = nullptr;
+ static bool supportEnabled_ = false;
+ static int overrideEnabled_ = -1;
+}
+
+} // namespace internal
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_GPR_FORK_H */
diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h
index 05c7ded45f..caf0652c1a 100644
--- a/src/core/lib/gprpp/thd.h
+++ b/src/core/lib/gprpp/thd.h
@@ -111,9 +111,6 @@ class Thread {
}
};
- static void Init();
- static bool AwaitAll(gpr_timespec deadline);
-
private:
Thread(const Thread&) = delete;
Thread& operator=(const Thread&) = delete;
diff --git a/src/core/lib/gprpp/thd_posix.cc b/src/core/lib/gprpp/thd_posix.cc
index 2f6c2edcae..1a20a0bbc3 100644
--- a/src/core/lib/gprpp/thd_posix.cc
+++ b/src/core/lib/gprpp/thd_posix.cc
@@ -38,11 +38,6 @@
namespace grpc_core {
namespace {
-gpr_mu g_mu;
-gpr_cv g_cv;
-int g_thread_count;
-int g_awaiting_threads;
-
class ThreadInternalsPosix;
struct thd_arg {
ThreadInternalsPosix* thread;
@@ -68,7 +63,7 @@ class ThreadInternalsPosix
info->body = thd_body;
info->arg = arg;
info->name = thd_name;
- inc_thd_count();
+ grpc_fork_inc_thd_count();
GPR_ASSERT(pthread_attr_init(&attr) == 0);
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
@@ -103,7 +98,7 @@ class ThreadInternalsPosix
gpr_mu_unlock(&arg.thread->mu_);
(*arg.body)(arg.arg);
- dec_thd_count();
+ grpc_fork_dec_thd_count();
return nullptr;
},
info) == 0);
@@ -113,7 +108,7 @@ class ThreadInternalsPosix
if (!success) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(info);
- dec_thd_count();
+ grpc_fork_dec_thd_count();
}
};
@@ -132,29 +127,6 @@ class ThreadInternalsPosix
void Join() override { pthread_join(pthread_id_, nullptr); }
private:
- /*****************************************
- * Only used when fork support is enabled
- */
-
- static void inc_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count++;
- gpr_mu_unlock(&g_mu);
- }
- }
-
- static void dec_thd_count() {
- if (grpc_fork_support_enabled()) {
- gpr_mu_lock(&g_mu);
- g_thread_count--;
- if (g_awaiting_threads && g_thread_count == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
- }
- }
-
gpr_mu mu_;
gpr_cv ready_;
bool started_;
@@ -180,27 +152,6 @@ Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
*success = outcome;
}
}
-
-void Thread::Init() {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
- g_thread_count = 0;
- g_awaiting_threads = 0;
-}
-
-bool Thread::AwaitAll(gpr_timespec deadline) {
- gpr_mu_lock(&g_mu);
- g_awaiting_threads = 1;
- int res = 0;
- while ((g_thread_count > 0) &&
- (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0)) {
- res = gpr_cv_wait(&g_cv, &g_mu, deadline);
- }
- g_awaiting_threads = 0;
- gpr_mu_unlock(&g_mu);
- return res == 0;
-}
-
} // namespace grpc_core
// The following is in the external namespace as it is exposed as C89 API
diff --git a/src/core/lib/gprpp/thd_windows.cc b/src/core/lib/gprpp/thd_windows.cc
index 59ea02f3d2..71584fd358 100644
--- a/src/core/lib/gprpp/thd_windows.cc
+++ b/src/core/lib/gprpp/thd_windows.cc
@@ -131,13 +131,6 @@ class ThreadInternalsWindows
namespace grpc_core {
-void Thread::Init() {}
-
-bool Thread::AwaitAll(gpr_timespec deadline) {
- // TODO: Consider adding this if needed
- return false;
-}
-
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success) {
bool outcome = false;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index de97164f02..b5b59621f9 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -25,6 +25,7 @@
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
+#include "src/core/lib/gpr/fork.h"
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/iomgr/closure.h"
@@ -76,16 +77,23 @@ class ExecCtx {
public:
/** Default Constructor */
- ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { Set(this); }
+ ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) {
+ grpc_fork_inc_exec_ctx_count();
+ Set(this);
+ }
/** Parameterised Constructor */
- ExecCtx(uintptr_t fl) : flags_(fl) { Set(this); }
+ ExecCtx(uintptr_t fl) : flags_(fl) {
+ grpc_fork_inc_exec_ctx_count();
+ Set(this);
+ }
/** Destructor */
virtual ~ExecCtx() {
flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
Flush();
Set(last_exec_ctx_);
+ grpc_fork_dec_exec_ctx_count();
}
/** Disallow copy and assignment operators */
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index f8645ab157..dd0f9f612f 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -41,47 +41,59 @@
* AROUND VERY SPECIFIC USE CASES.
*/
+namespace {
+bool skipped_handler = true;
+bool registered_handlers = false;
+} // namespace
+
void grpc_prefork() {
+ grpc_core::ExecCtx exec_ctx;
+ skipped_handler = true;
+ if (!grpc_is_initialized()) {
+ return;
+ }
if (!grpc_fork_support_enabled()) {
gpr_log(GPR_ERROR,
"Fork support not enabled; try running with the "
"environment variable GRPC_ENABLE_FORK_SUPPORT=1");
return;
}
- if (grpc_is_initialized()) {
- grpc_core::ExecCtx exec_ctx;
- grpc_timer_manager_set_threading(false);
- grpc_executor_set_threading(false);
- grpc_core::ExecCtx::Get()->Flush();
- if (!grpc_core::Thread::AwaitAll(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(3, GPR_TIMESPAN)))) {
- gpr_log(GPR_ERROR, "gRPC thread still active! Cannot fork!");
- }
+ if (!grpc_fork_block_exec_ctx()) {
+ gpr_log(GPR_INFO,
+ "Other threads are currently calling into gRPC, skipping fork() "
+ "handlers");
+ return;
}
+ grpc_timer_manager_set_threading(false);
+ grpc_executor_set_threading(false);
+ grpc_core::ExecCtx::Get()->Flush();
+ grpc_fork_await_thds();
+ skipped_handler = false;
}
void grpc_postfork_parent() {
- if (grpc_is_initialized()) {
- grpc_timer_manager_set_threading(true);
+ if (!skipped_handler) {
+ grpc_fork_allow_exec_ctx();
grpc_core::ExecCtx exec_ctx;
+ grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
}
}
void grpc_postfork_child() {
- if (grpc_is_initialized()) {
- grpc_timer_manager_set_threading(true);
+ if (!skipped_handler) {
+ grpc_fork_allow_exec_ctx();
grpc_core::ExecCtx exec_ctx;
+ grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
- grpc_core::ExecCtx::Get()->Flush();
}
}
void grpc_fork_handlers_auto_register() {
- if (grpc_fork_support_enabled()) {
+ if (grpc_fork_support_enabled() & !registered_handlers) {
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
pthread_atfork(grpc_prefork, grpc_postfork_parent, grpc_postfork_child);
+ registered_handlers = true;
#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
}
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index ac9f9e6066..285e6f7ddc 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -64,12 +64,10 @@ static int g_initializations;
static void do_basic_init(void) {
gpr_log_verbosity_init();
- grpc_fork_support_init();
gpr_mu_init(&g_init_mu);
grpc_register_built_in_plugins();
grpc_cq_global_init();
g_initializations = 0;
- grpc_fork_handlers_auto_register();
}
static bool append_filter(grpc_channel_stack_builder* builder, void* arg) {
@@ -122,8 +120,9 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ grpc_fork_support_init();
+ grpc_fork_handlers_auto_register();
gpr_time_init();
- grpc_core::Thread::Init();
grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
@@ -177,6 +176,7 @@ void grpc_shutdown(void) {
grpc_handshaker_factory_registry_shutdown();
grpc_slice_intern_shutdown();
grpc_stats_shutdown();
+ grpc_fork_support_destroy();
}
grpc_core::ExecCtx::GlobalShutdown();
}