diff options
author | 2018-06-25 10:25:28 -0700 | |
---|---|---|
committer | 2018-08-22 10:34:54 -0700 | |
commit | f8cf7ee56d4150ae870f44298a406f7b2ca038c0 (patch) | |
tree | 394c613d5b08d05a9e2f4494bcd59817e538ff10 /src/core | |
parent | f10596f0f3d6ab2dfe67b86d259a8d3effb73a99 (diff) |
Support gRPC Python client-side fork with epoll1
A process may fork after invoking grpc_init() and use gRPC in the child
if and only if the child process first destroys all gRPC resources
inherited from the parent process and invokes grpc_shutdown().
Subsequent to this, the child will be able to re-initialize and use
gRPC. After fork, the parent process will be able to continue to use
existing gRPC resources such as channels and calls without interference
from the child process.
To facilitate gRPC Python applications meeting the above constraints,
gRPC Python will automatically destroy and shutdown all gRPC Core
resources in the child's post-fork handler, including cancelling
in-flight calls (see detailed design below). From the client's
perspective, the child process is now free to create new channels and
use gRPC.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/gprpp/fork.cc | 73 | ||||
-rw-r--r-- | src/core/lib/gprpp/fork.h | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.cc | 72 | ||||
-rw-r--r-- | src/core/lib/iomgr/fork_posix.cc | 5 |
4 files changed, 130 insertions, 37 deletions
diff --git a/src/core/lib/gprpp/fork.cc b/src/core/lib/gprpp/fork.cc index f6d9a87d2c..0288c39680 100644 --- a/src/core/lib/gprpp/fork.cc +++ b/src/core/lib/gprpp/fork.cc @@ -157,11 +157,11 @@ class ThreadState { } // namespace void Fork::GlobalInit() { - if (!overrideEnabled_) { + if (!override_enabled_) { #ifdef GRPC_ENABLE_FORK_SUPPORT - supportEnabled_ = true; + support_enabled_ = true; #else - supportEnabled_ = false; + support_enabled_ = false; #endif bool env_var_set = false; char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT"); @@ -172,7 +172,7 @@ void Fork::GlobalInit() { "False", "FALSE", "0"}; for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { if (0 == strcmp(env, truthy[i])) { - supportEnabled_ = true; + support_enabled_ = true; env_var_set = true; break; } @@ -180,7 +180,7 @@ void Fork::GlobalInit() { if (!env_var_set) { for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) { if (0 == strcmp(env, falsey[i])) { - supportEnabled_ = false; + support_enabled_ = false; env_var_set = true; break; } @@ -189,72 +189,79 @@ void Fork::GlobalInit() { gpr_free(env); } } - if (supportEnabled_) { - execCtxState_ = grpc_core::New<internal::ExecCtxState>(); - threadState_ = grpc_core::New<internal::ThreadState>(); + if (support_enabled_) { + exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>(); + thread_state_ = grpc_core::New<internal::ThreadState>(); } } void Fork::GlobalShutdown() { - if (supportEnabled_) { - grpc_core::Delete(execCtxState_); - grpc_core::Delete(threadState_); + if (support_enabled_) { + grpc_core::Delete(exec_ctx_state_); + grpc_core::Delete(thread_state_); } } -bool Fork::Enabled() { return supportEnabled_; } +bool Fork::Enabled() { return support_enabled_; } // Testing Only void Fork::Enable(bool enable) { - overrideEnabled_ = true; - supportEnabled_ = enable; + override_enabled_ = true; + support_enabled_ = enable; } void Fork::IncExecCtxCount() { - if (supportEnabled_) { - execCtxState_->IncExecCtxCount(); + if (support_enabled_) { + exec_ctx_state_->IncExecCtxCount(); } } void Fork::DecExecCtxCount() { - if (supportEnabled_) { - execCtxState_->DecExecCtxCount(); + if (support_enabled_) { + exec_ctx_state_->DecExecCtxCount(); } } +void Fork::SetResetChildPollingEngineFunc(Fork::child_postfork_func func) { + reset_child_polling_engine_ = func; +} +Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() { + return reset_child_polling_engine_; +} + bool Fork::BlockExecCtx() { - if (supportEnabled_) { - return execCtxState_->BlockExecCtx(); + if (support_enabled_) { + return exec_ctx_state_->BlockExecCtx(); } return false; } void Fork::AllowExecCtx() { - if (supportEnabled_) { - execCtxState_->AllowExecCtx(); + if (support_enabled_) { + exec_ctx_state_->AllowExecCtx(); } } void Fork::IncThreadCount() { - if (supportEnabled_) { - threadState_->IncThreadCount(); + if (support_enabled_) { + thread_state_->IncThreadCount(); } } void Fork::DecThreadCount() { - if (supportEnabled_) { - threadState_->DecThreadCount(); + if (support_enabled_) { + thread_state_->DecThreadCount(); } } void Fork::AwaitThreads() { - if (supportEnabled_) { - threadState_->AwaitThreads(); + if (support_enabled_) { + thread_state_->AwaitThreads(); } } -internal::ExecCtxState* Fork::execCtxState_ = nullptr; -internal::ThreadState* Fork::threadState_ = nullptr; -bool Fork::supportEnabled_ = false; -bool Fork::overrideEnabled_ = false; - +internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr; +internal::ThreadState* Fork::thread_state_ = nullptr; +bool Fork::support_enabled_ = false; +bool Fork::override_enabled_ = false; +Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr; } // namespace grpc_core diff --git a/src/core/lib/gprpp/fork.h b/src/core/lib/gprpp/fork.h index 123e22c4c6..5a7404f0d9 100644 --- a/src/core/lib/gprpp/fork.h +++ b/src/core/lib/gprpp/fork.h @@ -33,6 +33,8 @@ class ThreadState; class Fork { public: + typedef void (*child_postfork_func)(void); + static void GlobalInit(); static void GlobalShutdown(); @@ -46,6 +48,12 @@ class Fork { // Decrement the count of active ExecCtxs static void DecExecCtxCount(); + // Provide a function that will be invoked in the child's postfork handler to + // reset the polling engine's internal state. + static void SetResetChildPollingEngineFunc( + child_postfork_func reset_child_polling_engine); + static child_postfork_func GetResetChildPollingEngineFunc(); + // Check if there is a single active ExecCtx // (the one used to invoke this function). If there are more, // return false. Otherwise, return true and block creation of @@ -68,10 +76,11 @@ class Fork { static void Enable(bool enable); private: - static internal::ExecCtxState* execCtxState_; - static internal::ThreadState* threadState_; - static bool supportEnabled_; - static bool overrideEnabled_; + static internal::ExecCtxState* exec_ctx_state_; + static internal::ThreadState* thread_state_; + static bool support_enabled_; + static bool override_enabled_; + static child_postfork_func reset_child_polling_engine_; }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 66e0f1fd6d..aa5016bd8f 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -131,6 +131,13 @@ static void epoll_set_shutdown() { * Fd Declarations */ +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +struct grpc_fork_fd_list { + grpc_fd* fd; + grpc_fd* next; + grpc_fd* prev; +}; + struct grpc_fd { int fd; @@ -141,6 +148,9 @@ struct grpc_fd { struct grpc_fd* freelist_next; grpc_iomgr_object iomgr_object; + + /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ + grpc_fork_fd_list* fork_fd_list; }; static void fd_global_init(void); @@ -256,6 +266,10 @@ static bool append_error(grpc_error** composite, grpc_error* error, static grpc_fd* fd_freelist = nullptr; static gpr_mu fd_freelist_mu; +/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */ +static grpc_fd* fork_fd_list_head = nullptr; +static gpr_mu fork_fd_list_mu; + static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); } static void fd_global_shutdown(void) { @@ -269,6 +283,38 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } +static void fork_fd_list_add_grpc_fd(grpc_fd* fd) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + fd->fork_fd_list = + static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list))); + fd->fork_fd_list->next = fork_fd_list_head; + fd->fork_fd_list->prev = nullptr; + if (fork_fd_list_head != nullptr) { + fork_fd_list_head->fork_fd_list->prev = fd; + } + fork_fd_list_head = fd; + gpr_mu_unlock(&fork_fd_list_mu); + } +} + +static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) { + if (grpc_core::Fork::Enabled()) { + gpr_mu_lock(&fork_fd_list_mu); + if (fork_fd_list_head == fd) { + fork_fd_list_head = fd->fork_fd_list->next; + } + if (fd->fork_fd_list->prev != nullptr) { + fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next; + } + if (fd->fork_fd_list->next != nullptr) { + fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev; + } + gpr_free(fd->fork_fd_list); + gpr_mu_unlock(&fork_fd_list_mu); + } +} + static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; @@ -295,6 +341,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { char* fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); + fork_fd_list_add_grpc_fd(new_fd); #ifndef NDEBUG if (grpc_trace_fd_refcount.enabled()) { gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); @@ -361,6 +408,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error)); grpc_iomgr_unregister_object(&fd->iomgr_object); + fork_fd_list_remove_grpc_fd(fd); fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); fd->error_closure->DestroyEvent(); @@ -1190,6 +1238,10 @@ static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); epoll_set_shutdown(); + if (grpc_core::Fork::Enabled()) { + gpr_mu_destroy(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); + } } static const grpc_event_engine_vtable vtable = { @@ -1227,6 +1279,21 @@ static const grpc_event_engine_vtable vtable = { shutdown_engine, }; +/* Called by the child process's post-fork handler to close open fds, including + * the global epoll fd. This allows gRPC to shutdown in the child process + * without interfering with connections or RPCs ongoing in the parent. */ +static void reset_event_manager_on_fork() { + gpr_mu_lock(&fork_fd_list_mu); + while (fork_fd_list_head != nullptr) { + close(fork_fd_list_head->fd); + fork_fd_list_head->fd = -1; + fork_fd_list_head = fork_fd_list_head->fork_fd_list->next; + } + gpr_mu_unlock(&fork_fd_list_mu); + shutdown_engine(); + grpc_init_epoll1_linux(true); +} + /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll * support is available */ @@ -1248,6 +1315,11 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { return nullptr; } + if (grpc_core::Fork::Enabled()) { + gpr_mu_init(&fork_fd_list_mu); + grpc_core::Fork::SetResetChildPollingEngineFunc( + reset_event_manager_on_fork); + } return &vtable; } diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index b37384b8db..a5b61fb4ce 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -84,6 +84,11 @@ void grpc_postfork_child() { if (!skipped_handler) { grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; + grpc_core::Fork::child_postfork_func reset_polling_engine = + grpc_core::Fork::GetResetChildPollingEngineFunc(); + if (reset_polling_engine != nullptr) { + reset_polling_engine(); + } grpc_timer_manager_set_threading(true); grpc_executor_set_threading(true); } |