aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-08-20 23:00:02 -0700
committerGravatar Eric Gribkoff <ericgribkoff@google.com>2018-08-23 09:40:39 -0700
commitacc020caf77240f7504af9c2c1ea5a0dac1884d6 (patch)
treee516cb3947e1e96372599fd02e727d373c465c7d /src/core
parent77b713324494e183da2bb458f072b0b5f0fb45a5 (diff)
Support tracking and closing fds post-fork in ev_poll_posix
This extends gRPC Python's fork compatibility to Mac OS, which does not support epoll The changes are a no-op if fork support is disabled
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc108
1 files changed, 108 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index fb4c71ef71..dd71ccdd0c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -60,6 +60,19 @@ typedef struct grpc_fd_watcher {
grpc_fd* fd;
} grpc_fd_watcher;
+typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
+
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+struct grpc_fork_fd_list {
+ /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
+ set to nullptr. */
+ grpc_fd* fd;
+ grpc_cached_wakeup_fd* cached_wakeup_fd;
+
+ grpc_fork_fd_list* next;
+ grpc_fork_fd_list* prev;
+};
+
struct grpc_fd {
int fd;
/* refst format:
@@ -108,8 +121,15 @@ struct grpc_fd {
grpc_closure* on_done_closure;
grpc_iomgr_object iomgr_object;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
};
+/* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+static grpc_fork_fd_list* fork_fd_list_head = nullptr;
+static gpr_mu fork_fd_list_mu;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -156,6 +176,9 @@ static void fd_unref(grpc_fd* fd);
typedef struct grpc_cached_wakeup_fd {
grpc_wakeup_fd fd;
struct grpc_cached_wakeup_fd* next;
+
+ /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
+ grpc_fork_fd_list* fork_fd_list;
} grpc_cached_wakeup_fd;
struct grpc_pollset_worker {
@@ -281,6 +304,58 @@ poll_hash_table poll_cache;
grpc_cv_fd_table g_cvfds;
/*******************************************************************************
+ * functions to track opened fds. No-ops unless GRPC_ENABLE_FORK_SUPPORT=1.
+ */
+
+static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ if (fork_fd_list_head == node) {
+ fork_fd_list_head = node->next;
+ }
+ if (node->prev != nullptr) {
+ node->prev->next = node->next;
+ }
+ if (node->next != nullptr) {
+ node->next->prev = node->prev;
+ }
+ gpr_free(node);
+ gpr_mu_unlock(&fork_fd_list_mu);
+ }
+}
+
+static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
+ gpr_mu_lock(&fork_fd_list_mu);
+ node->next = fork_fd_list_head;
+ node->prev = nullptr;
+ if (fork_fd_list_head != nullptr) {
+ fork_fd_list_head->prev = node;
+ }
+ fork_fd_list_head = node;
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
+static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->fd = fd;
+ fd->fork_fd_list->cached_wakeup_fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
+ if (grpc_core::Fork::Enabled()) {
+ fd->fork_fd_list =
+ static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
+ fd->fork_fd_list->cached_wakeup_fd = fd;
+ fd->fork_fd_list->fd = nullptr;
+ fork_fd_list_add_node(fd->fork_fd_list);
+ }
+}
+
+/*******************************************************************************
* fd_posix.c
*/
@@ -319,6 +394,7 @@ static void unref_by(grpc_fd* fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ fork_fd_list_remove_node(fd->fork_fd_list);
if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
@@ -347,6 +423,7 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
gpr_asprintf(&name2, "%s fd=%d", name, fd);
grpc_iomgr_register_object(&r->iomgr_object, name2);
gpr_free(name2);
+ fork_fd_list_add_grpc_fd(r);
return r;
}
@@ -822,6 +899,7 @@ static void pollset_destroy(grpc_pollset* pollset) {
GPR_ASSERT(!pollset_has_workers(pollset));
while (pollset->local_wakeup_cache) {
grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
+ fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
gpr_free(pollset->local_wakeup_cache);
pollset->local_wakeup_cache = next;
@@ -895,6 +973,7 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
gpr_malloc(sizeof(*worker.wakeup_fd)));
error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;
@@ -1705,6 +1784,10 @@ static void shutdown_engine(void) {
if (grpc_cv_wakeup_fds_enabled()) {
global_cv_fd_table_shutdown();
}
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_destroy(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
+ }
}
static const grpc_event_engine_vtable vtable = {
@@ -1742,6 +1825,26 @@ static const grpc_event_engine_vtable vtable = {
shutdown_engine,
};
+/* Called by the child process's post-fork handler to close open fds, including
+ * worker wakeup fds. This allows gRPC to shutdown in the child process without
+ * interfering with connections or RPCs ongoing in the parent. */
+static void reset_event_manager_on_fork() {
+ gpr_mu_lock(&fork_fd_list_mu);
+ while (fork_fd_list_head != nullptr) {
+ if (fork_fd_list_head->fd != nullptr) {
+ close(fork_fd_list_head->fd->fd);
+ fork_fd_list_head->fd->fd = -1;
+ } else {
+ close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
+ close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
+ fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
+ }
+ fork_fd_list_head = fork_fd_list_head->next;
+ }
+ gpr_mu_unlock(&fork_fd_list_mu);
+}
+
const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!grpc_has_wakeup_fd()) {
gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
@@ -1750,6 +1853,11 @@ const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
return nullptr;
}
+ if (grpc_core::Fork::Enabled()) {
+ gpr_mu_init(&fork_fd_list_mu);
+ grpc_core::Fork::SetResetChildPollingEngineFunc(
+ reset_event_manager_on_fork);
+ }
return &vtable;
}