diff options
author | yang-g <yangg@google.com> | 2016-01-07 16:24:55 -0800 |
---|---|---|
committer | yang-g <yangg@google.com> | 2016-01-07 16:24:55 -0800 |
commit | 8fefe37693489e74a7f5fac263b1a626c4fc2c50 (patch) | |
tree | 51bb62c732a2b9b17720b3a40daaebbb01d4e9a5 | |
parent | c75e7de6cde35c43ac35395e0c033aa5fa394084 (diff) |
Remove from all epoll sets when releasing an fd
-rw-r--r-- | src/core/iomgr/fd_posix.c | 23 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 65 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 2 |
3 files changed, 79 insertions, 11 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 3e28f0ffb4..b4bd6bf226 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -101,6 +101,7 @@ static grpc_fd *alloc_fd(int fd) { r->read_watcher = r->write_watcher = NULL; r->on_done_closure = NULL; r->closed = 0; + r->released = 0; return r; } @@ -210,6 +211,16 @@ static int has_watchers(grpc_fd *fd) { fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } +static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { + fd->closed = 1; + if (!fd->released) { + close(fd->fd); + } else { + grpc_remove_fd_from_all_epoll_sets(fd->fd); + } + grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); +} + void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason) { fd->on_done_closure = on_done; @@ -222,11 +233,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { - fd->closed = 1; - if (!fd->released) { - close(fd->fd); - } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); + close_fd_locked(exec_ctx, fd); } else { wake_all_watchers_locked(fd); } @@ -416,11 +423,7 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, maybe_wake_one_watcher_locked(fd); } if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { - fd->closed = 1; - if (!fd->released) { - close(fd->fd); - } - grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); + close_fd_locked(exec_ctx, fd); } gpr_mu_unlock(&fd->mu); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 6e31efa013..dae33e42f2 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -43,9 +43,66 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" -#include "src/core/support/block_annotate.h" #include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" + +struct epoll_fd_list { + int *epoll_fds; + size_t count; + size_t capacity; +}; + +static struct epoll_fd_list epoll_fd_global_list; +static gpr_once init_epoll_fd_list_mu = GPR_ONCE_INIT; +static gpr_mu epoll_fd_list_mu; + +static void init_mu(void) { gpr_mu_init(&epoll_fd_list_mu); } + +static void add_epoll_fd_to_global_list(int epoll_fd) { + gpr_once_init(&init_epoll_fd_list_mu, init_mu); + + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == epoll_fd_global_list.capacity) { + epoll_fd_global_list.capacity = + GPR_MAX((size_t)8, epoll_fd_global_list.capacity * 2); + epoll_fd_global_list.epoll_fds = + gpr_realloc(epoll_fd_global_list.epoll_fds, + epoll_fd_global_list.capacity * sizeof(int)); + } + epoll_fd_global_list.epoll_fds[epoll_fd_global_list.count++] = epoll_fd; + gpr_mu_unlock(&epoll_fd_list_mu); +} + +static void remove_epoll_fd_from_global_list(int epoll_fd) { + gpr_mu_lock(&epoll_fd_list_mu); + GPR_ASSERT(epoll_fd_global_list.count > 0); + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + if (epoll_fd == epoll_fd_global_list.epoll_fds[i]) { + epoll_fd_global_list.epoll_fds[i] = + epoll_fd_global_list.epoll_fds[--(epoll_fd_global_list.count)]; + break; + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} + +void grpc_remove_fd_from_all_epoll_sets(int fd) { + int err; + gpr_mu_lock(&epoll_fd_list_mu); + if (epoll_fd_global_list.count == 0) { + return; + } + for (size_t i = 0; i < epoll_fd_global_list.count; i++) { + err = epoll_ctl(epoll_fd_global_list.epoll_fds[i], EPOLL_CTL_DEL, fd, NULL); + if (err < 0 && errno != ENOENT) { + gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd, + strerror(errno)); + } + } + gpr_mu_unlock(&epoll_fd_list_mu); +} typedef struct { grpc_pollset *pollset; @@ -211,6 +268,7 @@ static void multipoll_with_epoll_pollset_finish_shutdown( static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { pollset_hdr *h = pollset->data.ptr; close(h->epoll_fd); + remove_epoll_fd_from_global_list(h->epoll_fd); gpr_free(h); } @@ -236,6 +294,7 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno)); abort(); } + add_epoll_fd_to_global_list(h->epoll_fd); ev.events = (uint32_t)(EPOLLIN | EPOLLET); ev.data.ptr = NULL; @@ -255,4 +314,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, grpc_platform_become_multipoller_type grpc_platform_become_multipoller = epoll_become_multipoller; +#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ + +void grpc_remove_fd_from_all_epoll_sets(int fd) {} + #endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 78fc27d2b3..8b1616a101 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -139,6 +139,8 @@ void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx, * be locked) */ int grpc_pollset_has_workers(grpc_pollset *pollset); +void grpc_remove_fd_from_all_epoll_sets(int fd); + /* override to allow tests to hook poll() usage */ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; |