diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/gprpp/fork.cc | 73 | ||||
-rw-r--r-- | src/core/lib/gprpp/fork.h | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.cc | 72 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 5 |
4 files changed, 130 insertions, 37 deletions
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc index f6d9a87d2c..0288c39680 100644 --- a/src/core/lib/gprpp/fork.cc +++ b/src/core/lib/gprpp/fork.cc @@ -157,11 +157,11 @@ class ThreadState { } // namespace void Fork::GlobalInit() { - if (!overrideEnabled_) { + if (!override_enabled_) { #ifdef GRPC_ENABLE_FORK_SUPPORT - supportEnabled_ = true; + support_enabled_ = true; #else - supportEnabled_ = false; + support_enabled_ = false; #endif bool env_var_set = false; char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); @@ -172,7 +172,7 @@ void Fork::GlobalInit() { "False", "FALSE", "0"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == strcmp(env, truthy[i])) { - supportEnabled_ = true; + support_enabled_ = true; env_var_set = true; break; } @@ -180,7 +180,7 @@ void Fork::GlobalInit() { if (!env_var_set) { for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { if (0 == strcmp(env, falsey[i])) { - supportEnabled_ = false; + support_enabled_ = false; env_var_set = true; break; } @@ -189,72 +189,79 @@ void Fork::GlobalInit() { gpr_free(env); } } - if (supportEnabled_) { - execCtxState_ = grpc_core::New<internal::ExecCtxState>(); - threadState_ = grpc_core::New<internal::ThreadState>(); + if (support_enabled_) { + exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>(); + thread_state_ = grpc_core::New<internal::ThreadState>(); } } void Fork::GlobalShutdown() { - if (supportEnabled_) { - grpc_core::Delete(execCtxState_); - grpc_core::Delete(threadState_); + if (support_enabled_) { + grpc_core::Delete(exec_ctx_state_); + grpc_core::Delete(thread_state_); } } -bool Fork::Enabled() { return supportEnabled_; } +bool Fork::Enabled() { return support_enabled_; } // Testing Only void Fork::Enable(bool enable) { - overrideEnabled_ = true; - supportEnabled_ = enable; + override_enabled_ = true; + support_enabled_ = enable; } void Fork::IncExecCtxCount() { - if (supportEnabled_) { - execCtxState_->IncExecCtxCount(); + if (support_enabled_) { + exec_ctx_state_->IncExecCtxCount(); } } void Fork::DecExecCtxCount() { - if (supportEnabled_) { - execCtxState_->DecExecCtxCount(); + if (support_enabled_) { + exec_ctx_state_->DecExecCtxCount(); } } +void Fork::SetResetChildPollingEngineFunc(Fork::child_postfork_func func) { + reset_child_polling_engine_ = func; +} +Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() { + return reset_child_polling_engine_; +} + bool Fork::BlockExecCtx() { - if (supportEnabled_) { - return execCtxState_->BlockExecCtx(); + if (support_enabled_) { + return exec_ctx_state_->BlockExecCtx(); } return false; } void Fork::AllowExecCtx() { - if (supportEnabled_) { - execCtxState_->AllowExecCtx(); + if (support_enabled_) { + exec_ctx_state_->AllowExecCtx(); } } void Fork::IncThreadCount() { - if (supportEnabled_) { - threadState_->IncThreadCount(); + if (support_enabled_) { + thread_state_->IncThreadCount(); } } void Fork::DecThreadCount() { - if (supportEnabled_) { - threadState_->DecThreadCount(); + if (support_enabled_) { + thread_state_->DecThreadCount(); } } void Fork::AwaitThreads() { - if (supportEnabled_) { - threadState_->AwaitThreads(); + if (support_enabled_) { + thread_state_->AwaitThreads(); } } -internal::ExecCtxState* Fork::execCtxState_ = nullptr; -internal::ThreadState* Fork::threadState_ = nullptr; -bool Fork::supportEnabled_ = false; -bool Fork::overrideEnabled_ = false; - +internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr; +internal::ThreadState* Fork::thread_state_ = nullptr; +bool Fork::support_enabled_ = false; +bool Fork::override_enabled_ = false; +Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr; } // namespace grpc_core diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h index 123e22c4c6..5a7404f0d9 100644 --- a/src/core/lib/gprpp/fork.h +++ b/src/core/lib/gprpp/fork.h @@ -33,6 +33,8 @@ class ThreadState; class Fork { public: + typedef void (*child_postfork_func)(void); + static void GlobalInit(); static void GlobalShutdown(); @@ -46,6 +48,12 @@ class Fork { // Decrement the count of active ExecCtxs static void DecExecCtxCount(); + // Provide a function that will be invoked in the child's postfork handler to + // reset the polling engine's internal state. + static void SetResetChildPollingEngineFunc( + child_postfork_func reset_child_polling_engine); + static child_postfork_func GetResetChildPollingEngineFunc(); + // Check if there is a single active ExecCtx // (the one used to invoke this function). If there are more, // return false. Otherwise, return true and block creation of @@ -68,10 +76,11 @@ class Fork { static void Enable(bool enable); private: - static internal::ExecCtxState* execCtxState_; - static internal::ThreadState* threadState_; - static bool supportEnabled_; - static bool overrideEnabled_; + static internal::ExecCtxState* exec_ctx_state_; + static internal::ThreadState* thread_state_; + static bool support_enabled_; + static bool override_enabled_; + static child_postfork_func reset_child_polling_engine_; }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 66e0f1fd6d..aa5016bd8f 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -131,6 +131,13 @@ static void epoll_set_shutdown() { * Fd Declarations */ +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +struct grpc_fork_fd_list { + grpc_fd* fd; + grpc_fd* next; + grpc_fd* prev; +}; + struct grpc_fd { int fd; @@ -141,6 +148,9 @@ struct grpc_fd { struct grpc_fd* freelist_next; grpc_iomgr_object iomgr_object; + + /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ + grpc_fork_fd_list* fork_fd_list; }; static void fd_global_init(void); @@ -256,6 +266,10 @@ static bool append_error(grpc_error** composite, grpc_error* error, static grpc_fd* fd_freelist = nullptr; static gpr_mu fd_freelist_mu; +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +static grpc_fd* fork_fd_list_head = nullptr; +static gpr_mu fork_fd_list_mu; + static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } static void fd_global_shutdown(void) { @@ -269,6 +283,38 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } +static void fork_fd_list_add_grpc_fd(grpc_fd* fd) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fd->fork_fd_list = + static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list))); + fd->fork_fd_list->next = fork_fd_list_head; + fd->fork_fd_list->prev = nullptr; + if (fork_fd_list_head != nullptr) { + fork_fd_list_head->fork_fd_list->prev = fd; + } + fork_fd_list_head = fd; + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + if (fork_fd_list_head == fd) { + fork_fd_list_head = fd->fork_fd_list->next; + } + if (fd->fork_fd_list->prev != nullptr) { + fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next; + } + if (fd->fork_fd_list->next != nullptr) { + fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev; + } + gpr_free(fd->fork_fd_list); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; @@ -295,6 +341,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { char* fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); + fork_fd_list_add_grpc_fd(new_fd); #ifndef NDEBUG if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); @@ -361,6 +408,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error)); grpc_iomgr_unregister_object(&fd->iomgr_object); + fork_fd_list_remove_grpc_fd(fd); fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); fd->error_closure->DestroyEvent(); @@ -1190,6 +1238,10 @@ static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); epoll_set_shutdown(); + if (grpc_core::Fork::Enabled()) { + gpr_mu_destroy(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); + } } static const grpc_event_engine_vtable vtable = { @@ -1227,6 +1279,21 @@ static const grpc_event_engine_vtable vtable = { shutdown_engine, }; +/* Called by the child process's post-fork handler to close open fds, including + * the global epoll fd. This allows gRPC to shutdown in the child process + * without interfering with connections or RPCs ongoing in the parent. */ +static void reset_event_manager_on_fork() { + gpr_mu_lock(&fork_fd_list_mu); + while (fork_fd_list_head != nullptr) { + close(fork_fd_list_head->fd); + fork_fd_list_head->fd = -1; + fork_fd_list_head = fork_fd_list_head->fork_fd_list->next; + } + gpr_mu_unlock(&fork_fd_list_mu); + shutdown_engine(); + grpc_init_epoll1_linux(true); +} + /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll * support is available */ @@ -1248,6 +1315,11 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { return nullptr; } + if (grpc_core::Fork::Enabled()) { + gpr_mu_init(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc( + reset_event_manager_on_fork); + } return &vtable; } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index b37384b8db..a5b61fb4ce 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -84,6 +84,11 @@ void grpc_postfork_child() { if (!skipped_handler) { grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; + grpc_core::Fork::child_postfork_func reset_polling_engine = + grpc_core::Fork::GetResetChildPollingEngineFunc(); + if (reset_polling_engine != nullptr) { + reset_polling_engine(); + } grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); } |