aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_poll_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_poll_posix.cc')
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc114
1 files changed, 113 insertions, 1 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index fb4c71ef71..16562538a6 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;
+typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
+ set to nullptr. */
+ grpc_fd* fd;
+ grpc_cached_wakeup_fd* cached_wakeup_fd;
+
+ grpc_fork_fd_list* next;
+ grpc_fork_fd_list* prev;
+};
+
struct grpc_fd {
int fd;
/* refst format:
@@ -108,8 +121,18 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
+/* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
+static bool track_fds_for_fork = false;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fork_fd_list* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -156,6 +179,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker {
@@ -281,9 +307,61 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
/*******************************************************************************
- * fd_posix.c
+ * functions to track opened fds. No-ops unless track_fds_for_fork is true.
*/
+static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
+ if (track_fds_for_fork) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == node) {
+ fork_fd_list_head = node->next;
+ }
+ if (node->prev != nullptr) {
+ node->prev->next = node->next;
+ }
+ if (node->next != nullptr) {
+ node->next->prev = node->prev;
+ }
+ gpr_free(node);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ node->next = fork_fd_list_head;
+ node->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->prev = node;
+ }
+ fork_fd_list_head = node;
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->fd = fd;
+ fd->fork_fd_list->cached_wakeup_fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
+ if (track_fds_for_fork) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->cached_wakeup_fd = fd;
+ fd->fork_fd_list->fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+ /*******************************************************************************
+ * fd_posix.c
+ */
+
#ifndef NDEBUG
#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
@@ -319,6 +397,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
@@ -347,6 +426,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
+ fork_fd_list_add_grpc_fd(r);
return r;
}
@@ -822,6 +902,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
+ fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
@@ -895,6 +976,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
@@ -1705,6 +1787,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
+ if (track_fds_for_fork) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1742,6 +1828,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * worker wakeup fds. This allows gRPC to shutdown in the child process without
+ * interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ if (fork_fd_list_head->fd != nullptr) {
+ close(fork_fd_list_head->fd->fd);
+ fork_fd_list_head->fd->fd = -1;
+ } else {
+ close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
+ close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
+ }
+ fork_fd_list_head = fork_fd_list_head->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
@@ -1750,6 +1856,12 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ track_fds_for_fork = true;
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}