aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/gprpp/fork.cc73
-rw-r--r--src/core/lib/gprpp/fork.h17
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc72
-rw-r--r--src/core/lib/iomgr/fork_posix.cc5
4 files changed, 130 insertions, 37 deletions
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc
index f6d9a87d2c..0288c39680 100644
--- a/src/core/lib/gprpp/fork.cc
+++ b/src/core/lib/gprpp/fork.cc
@@ -157,11 +157,11 @@ class ThreadState {
} // namespace
void Fork::GlobalInit() {
- if (!overrideEnabled_) {
+ if (!override_enabled_) {
#ifdef GRPC_ENABLE_FORK_SUPPORT
- supportEnabled_ = true;
+ support_enabled_ = true;
#else
- supportEnabled_ = false;
+ support_enabled_ = false;
#endif
bool env_var_set = false;
char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
@@ -172,7 +172,7 @@ void Fork::GlobalInit() {
"False", "FALSE", "0"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) {
- supportEnabled_ = true;
+ support_enabled_ = true;
env_var_set = true;
break;
}
@@ -180,7 +180,7 @@ void Fork::GlobalInit() {
if (!env_var_set) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
if (0 == strcmp(env, falsey[i])) {
- supportEnabled_ = false;
+ support_enabled_ = false;
env_var_set = true;
break;
}
@@ -189,72 +189,79 @@ void Fork::GlobalInit() {
gpr_free(env);
}
}
- if (supportEnabled_) {
- execCtxState_ = grpc_core::New<internal::ExecCtxState>();
- threadState_ = grpc_core::New<internal::ThreadState>();
+ if (support_enabled_) {
+ exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>();
+ thread_state_ = grpc_core::New<internal::ThreadState>();
}
}
void Fork::GlobalShutdown() {
- if (supportEnabled_) {
- grpc_core::Delete(execCtxState_);
- grpc_core::Delete(threadState_);
+ if (support_enabled_) {
+ grpc_core::Delete(exec_ctx_state_);
+ grpc_core::Delete(thread_state_);
}
}
-bool Fork::Enabled() { return supportEnabled_; }
+bool Fork::Enabled() { return support_enabled_; }
// Testing Only
void Fork::Enable(bool enable) {
- overrideEnabled_ = true;
- supportEnabled_ = enable;
+ override_enabled_ = true;
+ support_enabled_ = enable;
}
void Fork::IncExecCtxCount() {
- if (supportEnabled_) {
- execCtxState_->IncExecCtxCount();
+ if (support_enabled_) {
+ exec_ctx_state_->IncExecCtxCount();
}
}
void Fork::DecExecCtxCount() {
- if (supportEnabled_) {
- execCtxState_->DecExecCtxCount();
+ if (support_enabled_) {
+ exec_ctx_state_->DecExecCtxCount();
}
}
+void Fork::SetResetChildPollingEngineFunc(Fork::child_postfork_func func) {
+ reset_child_polling_engine_ = func;
+}
+Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
+ return reset_child_polling_engine_;
+}
+
bool Fork::BlockExecCtx() {
- if (supportEnabled_) {
- return execCtxState_->BlockExecCtx();
+ if (support_enabled_) {
+ return exec_ctx_state_->BlockExecCtx();
}
return false;
}
void Fork::AllowExecCtx() {
- if (supportEnabled_) {
- execCtxState_->AllowExecCtx();
+ if (support_enabled_) {
+ exec_ctx_state_->AllowExecCtx();
}
}
void Fork::IncThreadCount() {
- if (supportEnabled_) {
- threadState_->IncThreadCount();
+ if (support_enabled_) {
+ thread_state_->IncThreadCount();
}
}
void Fork::DecThreadCount() {
- if (supportEnabled_) {
- threadState_->DecThreadCount();
+ if (support_enabled_) {
+ thread_state_->DecThreadCount();
}
}
void Fork::AwaitThreads() {
- if (supportEnabled_) {
- threadState_->AwaitThreads();
+ if (support_enabled_) {
+ thread_state_->AwaitThreads();
}
}
-internal::ExecCtxState* Fork::execCtxState_ = nullptr;
-internal::ThreadState* Fork::threadState_ = nullptr;
-bool Fork::supportEnabled_ = false;
-bool Fork::overrideEnabled_ = false;
-
+internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
+internal::ThreadState* Fork::thread_state_ = nullptr;
+bool Fork::support_enabled_ = false;
+bool Fork::override_enabled_ = false;
+Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
} // namespace grpc_core
diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h
index 123e22c4c6..5a7404f0d9 100644
--- a/src/core/lib/gprpp/fork.h
+++ b/src/core/lib/gprpp/fork.h
@@ -33,6 +33,8 @@ class ThreadState;
class Fork {
public:
+ typedef void (*child_postfork_func)(void);
+
static void GlobalInit();
static void GlobalShutdown();
@@ -46,6 +48,12 @@ class Fork {
// Decrement the count of active ExecCtxs
static void DecExecCtxCount();
+ // Provide a function that will be invoked in the child's postfork handler to
+ // reset the polling engine's internal state.
+ static void SetResetChildPollingEngineFunc(
+ child_postfork_func reset_child_polling_engine);
+ static child_postfork_func GetResetChildPollingEngineFunc();
+
// Check if there is a single active ExecCtx
// (the one used to invoke this function). If there are more,
// return false. Otherwise, return true and block creation of
@@ -68,10 +76,11 @@ class Fork {
static void Enable(bool enable);
private:
- static internal::ExecCtxState* execCtxState_;
- static internal::ThreadState* threadState_;
- static bool supportEnabled_;
- static bool overrideEnabled_;
+ static internal::ExecCtxState* exec_ctx_state_;
+ static internal::ThreadState* thread_state_;
+ static bool support_enabled_;
+ static bool override_enabled_;
+ static child_postfork_func reset_child_polling_engine_;
};
} // namespace grpc_core
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 66e0f1fd6d..aa5016bd8f 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -131,6 +131,13 @@ static void epoll_set_shutdown() {
* Fd Declarations
*/
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ grpc_fd* fd;
+ grpc_fd* next;
+ grpc_fd* prev;
+};
+
struct grpc_fd {
int fd;
@@ -141,6 +148,9 @@ struct grpc_fd {
struct grpc_fd* freelist_next;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
static void fd_global_init(void);
@@ -256,6 +266,10 @@ static bool append_error(grpc_error** composite, grpc_error* error,
static grpc_fd* fd_freelist = nullptr;
static gpr_mu fd_freelist_mu;
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fd* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
static void fd_global_shutdown(void) {
@@ -269,6 +283,38 @@ static void fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->next = fork_fd_list_head;
+ fd->fork_fd_list->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->fork_fd_list->prev = fd;
+ }
+ fork_fd_list_head = fd;
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == fd) {
+ fork_fd_list_head = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->prev != nullptr) {
+ fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
+ }
+ if (fd->fork_fd_list->next != nullptr) {
+ fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
+ }
+ gpr_free(fd->fork_fd_list);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_fd* new_fd = nullptr;
@@ -295,6 +341,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
char* fd_name;
gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
+ fork_fd_list_add_grpc_fd(new_fd);
#ifndef NDEBUG
if (grpc_trace_fd_refcount.enabled()) {
gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
@@ -361,6 +408,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_grpc_fd(fd);
fd->read_closure->DestroyEvent();
fd->write_closure->DestroyEvent();
fd->error_closure->DestroyEvent();
@@ -1190,6 +1238,10 @@ static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
epoll_set_shutdown();
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1227,6 +1279,21 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * the global epoll fd. This allows gRPC to shutdown in the child process
+ * without interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ close(fork_fd_list_head->fd);
+ fork_fd_list_head->fd = -1;
+ fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+ shutdown_engine();
+ grpc_init_epoll1_linux(true);
+}
+
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
* support is available */
@@ -1248,6 +1315,11 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index b37384b8db..a5b61fb4ce 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -84,6 +84,11 @@ void grpc_postfork_child() {
if (!skipped_handler) {
grpc_core::Fork::AllowExecCtx();
grpc_core::ExecCtx exec_ctx;
+ grpc_core::Fork::child_postfork_func reset_polling_engine =
+ grpc_core::Fork::GetResetChildPollingEngineFunc();
+ if (reset_polling_engine != nullptr) {
+ reset_polling_engine();
+ }
grpc_timer_manager_set_threading(true);
grpc_executor_set_threading(true);
}