diff options
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index d1752327a2..ba62d36507 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -113,6 +113,9 @@ struct grpc_fd { grpc_closure *on_done_closure; grpc_iomgr_object iomgr_object; + + /* The pollset that last noticed and notified that the fd is readable */ + grpc_pollset *read_notifier_pollset; }; /* Begin polling on an fd. @@ -134,7 +137,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec, - int got_read, int got_write); + int got_read, int got_write, + grpc_pollset *read_notifier_pollset); /* Return 1 if this fd is orphaned, 0 otherwise */ static bool fd_is_orphaned(grpc_fd *fd); @@ -301,6 +305,7 @@ static grpc_fd *fd_create(int fd, const char *name) { r->on_done_closure = NULL; r->closed = 0; r->released = 0; + r->read_notifier_pollset = NULL; char *name2; gpr_asprintf(&name2, "%s fd=%d", name, fd); @@ -316,6 +321,18 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } +/* Return the read-notifier pollset */ +static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { + grpc_pollset *notifier = NULL; + + gpr_mu_lock(&fd->mu); + notifier = fd->read_notifier_pollset; + gpr_mu_unlock(&fd->mu); + + return notifier; +} + static void pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); @@ -444,6 +461,11 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } } +static void set_read_notifier_pollset_locked( + grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *read_notifier_pollset) { + fd->read_notifier_pollset = read_notifier_pollset; +} + static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); GPR_ASSERT(!fd->shutdown); @@ -519,7 +541,8 @@ static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, } static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, - int got_read, int got_write) { + int got_read, int got_write, + grpc_pollset *read_notifier_pollset) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -555,6 +578,9 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { kick = 1; } + if (read_notifier_pollset != NULL) { + set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + } } if (got_write) { if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { @@ -899,11 +925,11 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); } for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else if (r == 0) { for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -914,10 +940,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0); + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, - pfds[i].revents & POLLOUT_CHECK); + pfds[i].revents & POLLOUT_CHECK, pollset); } } } @@ -1181,6 +1207,7 @@ static const grpc_event_engine_vtable vtable = { .fd_shutdown = fd_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, + .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, |