diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-06-04 14:01:03 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-06-04 14:01:03 -0700 |
commit | 79a6233bef501e1f51250351a55abc61cc024827 (patch) | |
tree | 5dd15de613a364db7c8f3683aef30b19de584ebf /src/core | |
parent | 88ee12fbe98685e736366d6a151a10ed103f8979 (diff) |
Fix a few bugs in ev_epoll_linux.c
1. pollset_add_fd: Add fd to epoll set if fd->polling_island == NULL
2. close(fd) in fd_orphan instead of polling_island_remove_fd_locked()
since fd->polling_island may be NULL
3. If pollset work() is interrupted, do a zero timeout epoll_wait().
pollset_work may be called without a polling island
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 125 |
1 files changed, 78 insertions, 47 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 0fb1ccfa0f..3aa26109f2 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -72,9 +72,14 @@ struct grpc_fd { gpr_atm refst; gpr_mu mu; + + /* Indicates that the fd is shutdown and that any pending read/write closures + should fail */ bool shutdown; - int closed; - bool released; + + /* The fd is either closed or we relinquished control of it. In either cases, + this indicates that the 'fd' on this structure is no longer valid */ + bool orphaned; grpc_closure *read_closure; grpc_closure *write_closure; @@ -251,16 +256,13 @@ static void polling_island_remove_all_fds_locked(polling_island *pi, /* The caller is expected to hold pi->mu lock before calling this function */ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, - bool close_fd, bool remove_fd_ref) { + bool is_fd_closed) { int err; size_t i; - /* Calling close() on the fd will automatically remove it from the epoll set. - If not calling close(), the fd must be explicitly removed from the epoll - set */ - if (close_fd) { - close(fd->fd); - } else { + /* If fd is already closed, then it would have been automatically been removed + from the epoll set */ + if (!is_fd_closed) { err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); if (err < 0 && errno != ENOENT) { gpr_log(GPR_ERROR, "epoll_ctl delete for fd: %d failed with error; %s", @@ -271,9 +273,7 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, for (i = 0; i < pi->fd_cnt; i++) { if (pi->fds[i] == fd) { pi->fds[i] = pi->fds[--pi->fd_cnt]; - if (remove_fd_ref) { - GRPC_FD_UNREF(fd, "polling_island"); - } + GRPC_FD_UNREF(fd, "polling_island"); break; } } @@ -644,8 +644,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd->polling_island = NULL; new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - new_fd->closed = 0; - new_fd->released = false; + new_fd->orphaned = false; gpr_mu_unlock(&new_fd->mu); @@ -666,7 +665,7 @@ static bool fd_is_orphaned(grpc_fd *fd) { static int fd_wrapped_fd(grpc_fd *fd) { int ret_fd = -1; gpr_mu_lock(&fd->mu); - if (!fd->released && !fd->closed) { + if (!fd->orphaned) { ret_fd = fd->fd; } gpr_mu_unlock(&fd->mu); @@ -678,34 +677,35 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason) { /* TODO(sreek) In ev_poll_posix.c,the lock is acquired a little later. Why? */ + bool is_fd_closed = false; gpr_mu_lock(&fd->mu); fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file descriptor fd->fd (but we still own the grpc_fd structure). */ - fd->released = release_fd != NULL; - if (!fd->released) { - shutdown(fd->fd, SHUT_RDWR); - } else { + if (release_fd != NULL) { *release_fd = fd->fd; + } else { + close(fd->fd); + is_fd_closed = true; } - REF_BY(fd, 1, reason); /* Remove active status, but keep referenced */ - fd->closed = 1; + fd->orphaned = true; + + /* Remove the active status but keep referenced. We want this grpc_fd struct + to be alive (and not added to freelist) until the end of this function */ + REF_BY(fd, 1, reason); /* Remove the fd from the polling island: - Update the fd->polling_island to point to the latest polling island - - Remove the fd from the polling island. Also, call close() on the file - descriptor fd->fd ONLY if we haven't relinquised control (i.e - fd->released is 'false') - - Decrement the ref count on the polling island and det fd->polling_island - to NULL */ + - Remove the fd from the polling island. + - Remove a ref to the polling island and set fd->polling_island to NULL */ gpr_mu_lock(&fd->pi_mu); if (fd->polling_island != NULL) { fd->polling_island = polling_island_update_and_lock(fd->polling_island, 1, 0); - polling_island_remove_fd_locked(fd->polling_island, fd, !fd->released, - true); + polling_island_remove_fd_locked(fd->polling_island, fd, is_fd_closed); + polling_island_unref_and_unlock(fd->polling_island, 1); fd->polling_island = NULL; } @@ -839,17 +839,20 @@ static void pollset_kick(grpc_pollset *p, grpc_pollset_worker *worker = specific_worker; if (worker != NULL) { if (worker == GRPC_POLLSET_KICK_BROADCAST) { - GPR_TIMER_BEGIN("pollset_kick.broadcast", 0); + gpr_log(GPR_DEBUG, "pollset_kick: broadcast!"); if (pollset_has_workers(p)) { + GPR_TIMER_BEGIN("pollset_kick.broadcast", 0); for (worker = p->root_worker.next; worker != &p->root_worker; worker = worker->next) { pthread_kill(worker->pt_id, SIGUSR1); } } else { + gpr_log(GPR_DEBUG, "pollset_kick: (broadcast) Kicked without pollers"); p->kicked_without_pollers = true; } GPR_TIMER_END("pollset_kick.broadcast", 0); } else { + gpr_log(GPR_DEBUG, "pollset_kick: kicked kicked_specifically"); GPR_TIMER_MARK("kicked_specifically", 0); worker->kicked_specifically = true; pthread_kill(worker->pt_id, SIGUSR1); @@ -860,9 +863,11 @@ static void pollset_kick(grpc_pollset *p, if (worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, worker); + gpr_log(GPR_DEBUG, "pollset_kick: anonymous kick"); pthread_kill(worker->pt_id, SIGUSR1); } else { GPR_TIMER_MARK("kicked_no_pollers", 0); + gpr_log(GPR_DEBUG, "pollset_kick: kicked without pollers"); p->kicked_without_pollers = true; } } @@ -935,6 +940,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int epoll_fd = -1; int ep_rv; + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Entering.."); GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the @@ -949,6 +955,16 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, pollset->polling_island = polling_island_update_and_lock(pollset->polling_island, 1, 0); epoll_fd = pollset->polling_island->epoll_fd; + if (pollset->polling_island->fd_cnt == 0) { + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_fd: %d, No other fds", + epoll_fd); + } + for (size_t i = 0; i < pollset->polling_island->fd_cnt; i++) { + gpr_log(GPR_DEBUG, + "pollset_work_and_unlock: epoll_fd: %d, fd_count: %d, fd[%d]: %d", + epoll_fd, pollset->polling_island->fd_cnt, i, + pollset->polling_island->fds[i]->fd); + } gpr_mu_unlock(&pollset->polling_island->mu); } @@ -958,36 +974,47 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* If epoll_fd == -1, this is a blank pollset and does not have any fds yet */ if (epoll_fd != -1) { do { + gpr_timespec before_epoll = gpr_now(GPR_CLOCK_PRECISE); + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: epoll_wait()...."); ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); + gpr_timespec after_epoll = gpr_now(GPR_CLOCK_PRECISE); + int dur = gpr_time_to_millis(gpr_time_sub(after_epoll, before_epoll)); + gpr_log(GPR_DEBUG, + "pollset_work_and_unlock: DONE epoll_wait() : %d ms, ep_rv: %d", + dur, ep_rv); if (ep_rv < 0) { if (errno != EINTR) { /* TODO (sreek) - Check for bad file descriptor error */ gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno)); + } else { + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: 0-timeout epoll_wait()"); + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: ep_rv: %d", ep_rv); } - } else { - int i; - for (i = 0; i < ep_rv; ++i) { - grpc_fd *fd = ep_ev[i].data.ptr; - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (fd == NULL) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } + } + + int i; + for (i = 0; i < ep_rv; ++i) { + grpc_fd *fd = ep_ev[i].data.ptr; + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (fd == NULL) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } else { + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); } } } } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } - + gpr_log(GPR_DEBUG, "pollset_work_and_unlock: Leaving.."); GPR_TIMER_END("pollset_work_and_unlock", 0); } @@ -1060,7 +1087,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { GPR_TIMER_BEGIN("pollset_work", 0); - + gpr_log(GPR_DEBUG, "pollset_work: enter"); int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); sigset_t new_mask; @@ -1079,6 +1106,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, work that needs attention like an event on the completion queue or an alarm */ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0); + gpr_log(GPR_INFO, "pollset_work: kicked without pollers.."); pollset->kicked_without_pollers = 0; } else if (!pollset->shutting_down) { sigemptyset(&new_mask); @@ -1113,12 +1141,14 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } + gpr_log(GPR_DEBUG, "pollset_work(): leaving"); *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + gpr_log(GPR_DEBUG, "pollset_add_fd: pollset: %p, fd: %d", pollset, fd->fd); /* TODO sreek - Check if we need to get a pollset->mu lock here */ gpr_mu_lock(&pollset->pi_mu); gpr_mu_lock(&fd->pi_mu); @@ -1146,6 +1176,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else if (fd->polling_island == NULL) { pi_new = polling_island_update_and_lock(pollset->polling_island, 1, 1); + polling_island_add_fds_locked(pollset->polling_island, &fd, 1, true); gpr_mu_unlock(&pi_new->mu); } else if (pollset->polling_island == NULL) { pi_new = polling_island_update_and_lock(fd->polling_island, 1, 1); |