aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2016-01-07 16:24:55 -0800
committerGravatar yang-g <yangg@google.com>2016-01-07 16:24:55 -0800
commit8fefe37693489e74a7f5fac263b1a626c4fc2c50 (patch)
tree51bb62c732a2b9b17720b3a40daaebbb01d4e9a5
parentc75e7de6cde35c43ac35395e0c033aa5fa394084 (diff)
Remove from all epoll sets when releasing an fd
-rw-r--r--src/core/iomgr/fd_posix.c23
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c65
-rw-r--r--src/core/iomgr/pollset_posix.h2
3 files changed, 79 insertions, 11 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 3e28f0ffb4..b4bd6bf226 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -101,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) {
r->read_watcher = r->write_watcher = NULL;
r->on_done_closure = NULL;
r->closed = 0;
+ r->released = 0;
return r;
}
@@ -210,6 +211,16 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ } else {
+ grpc_remove_fd_from_all_epoll_sets(fd->fd);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
+}
+
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason) {
fd->on_done_closure = on_done;
@@ -222,11 +233,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
if (!has_watchers(fd)) {
- fd->closed = 1;
- if (!fd->released) {
- close(fd->fd);
- }
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
+ close_fd_locked(exec_ctx, fd);
} else {
wake_all_watchers_locked(fd);
}
@@ -416,11 +423,7 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
maybe_wake_one_watcher_locked(fd);
}
if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
- fd->closed = 1;
- if (!fd->released) {
- close(fd->fd);
- }
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
+ close_fd_locked(exec_ctx, fd);
}
gpr_mu_unlock(&fd->mu);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 6e31efa013..dae33e42f2 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -43,9 +43,66 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
-#include "src/core/support/block_annotate.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
+
+struct epoll_fd_list {
+ int *epoll_fds;
+ size_t count;
+ size_t capacity;
+};
+
+static struct epoll_fd_list epoll_fd_global_list;
+static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT;
+static gpr_mu epoll_fd_list_mu;
+
+static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); }
+
+static void add_epoll_fd_to_global_list(int epoll_fd) {
+ gpr_once_init(&init_epoll_fd_list_mu, init_mu);
+
+ gpr_mu_lock(&epoll_fd_list_mu);
+ if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) {
+ epoll_fd_global_list.capacity =
+ GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2);
+ epoll_fd_global_list.epoll_fds =
+ gpr_realloc(epoll_fd_global_list.epoll_fds,
+ epoll_fd_global_list.capacity * sizeof(int));
+ }
+ epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd;
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
+
+static void remove_epoll_fd_from_global_list(int epoll_fd) {
+ gpr_mu_lock(&epoll_fd_list_mu);
+ GPR_ASSERT(epoll_fd_global_list.count > 0);
+ for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
+ if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) {
+ epoll_fd_global_list.epoll_fds[i] =
+ epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)];
+ break;
+ }
+ }
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
+
+void grpc_remove_fd_from_all_epoll_sets(int fd) {
+ int err;
+ gpr_mu_lock(&epoll_fd_list_mu);
+ if (epoll_fd_global_list.count == 0) {
+ return;
+ }
+ for (size_t i = 0; i < epoll_fd_global_list.count; i++) {
+ err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL);
+ if (err < 0 && errno != ENOENT) {
+ gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd,
+ strerror(errno));
+ }
+ }
+ gpr_mu_unlock(&epoll_fd_list_mu);
+}
typedef struct {
grpc_pollset *pollset;
@@ -211,6 +268,7 @@ static void multipoll_with_epoll_pollset_finish_shutdown(
static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
pollset_hdr *h = pollset->data.ptr;
close(h->epoll_fd);
+ remove_epoll_fd_from_global_list(h->epoll_fd);
gpr_free(h);
}
@@ -236,6 +294,7 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
+ add_epoll_fd_to_global_list(h->epoll_fd);
ev.events = (uint32_t)(EPOLLIN | EPOLLET);
ev.data.ptr = NULL;
@@ -255,4 +314,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
epoll_become_multipoller;
+#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
+
+void grpc_remove_fd_from_all_epoll_sets(int fd) {}
+
#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 78fc27d2b3..8b1616a101 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -139,6 +139,8 @@ void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx,
* be locked) */
int grpc_pollset_has_workers(grpc_pollset *pollset);
+void grpc_remove_fd_from_all_epoll_sets(int fd);
+
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;