diff options
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 96 |
1 files changed, 77 insertions, 19 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 9c8133d2d4..63615ea25f 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) { gpr_atm_rel_store(&r->writest, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; - r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; + r->inactive_watcher_root.next = r->inactive_watcher_root.prev = + &r->inactive_watcher_root; r->freelist_next = NULL; + r->read_watcher = r->write_watcher = NULL; return r; } @@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void wake_watchers(grpc_fd *fd) { - grpc_fd_watcher *watcher; +static void maybe_wake_one_watcher_locked(grpc_fd *fd) { + if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { + grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset); + } else if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } else if (fd->write_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } +} + +static void maybe_wake_one_watcher(grpc_fd *fd) { gpr_mu_lock(&fd->watcher_mu); - for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root; - watcher = watcher->next) { + maybe_wake_one_watcher_locked(fd); + gpr_mu_unlock(&fd->watcher_mu); +} + +static void wake_all_watchers(grpc_fd *fd) { + grpc_fd_watcher *watcher; + for (watcher = fd->inactive_watcher_root.next; + watcher != &fd->inactive_watcher_root; watcher = watcher->next) { grpc_pollset_force_kick(watcher->pollset); } - gpr_mu_unlock(&fd->watcher_mu); + if (fd->read_watcher) { + grpc_pollset_force_kick(fd->read_watcher->pollset); + } + if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { + grpc_pollset_force_kick(fd->write_watcher->pollset); + } } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { @@ -162,7 +184,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { fd->on_done_user_data = user_data; shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ - wake_watchers(fd); + wake_all_watchers(fd); unref_by(fd, 2); /* drop the reference */ } @@ -204,7 +226,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, set_ready call. NOTE: we don't have an ABA problem here, since we should never have concurrent calls to the same notify_on function. */ - wake_watchers(fd); + maybe_wake_one_watcher(fd); return; } /* swap was unsuccessful due to an intervening set_ready call. @@ -290,29 +312,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *watcher) { + gpr_uint32 mask = 0; /* keep track of pollers that have requested our events, in case they change */ grpc_fd_ref(fd); gpr_mu_lock(&fd->watcher_mu); - watcher->next = &fd->watcher_root; - watcher->prev = watcher->next->prev; - watcher->next->prev = watcher->prev->next = watcher; + /* if there is nobody polling for read, but we need to, then start doing so */ + if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + fd->read_watcher = watcher; + mask |= read_mask; + } + /* if there is nobody polling for write, but we need to, then start doing so + */ + if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + fd->write_watcher = watcher; + mask |= write_mask; + } + /* if not polling, remember this watcher in case we need someone to later */ + if (mask == 0) { + watcher->next = &fd->inactive_watcher_root; + watcher->prev = watcher->next->prev; + watcher->next->prev = watcher->prev->next = watcher; + } watcher->pollset = pollset; watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); - return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0); + return mask; } -void grpc_fd_end_poll(grpc_fd_watcher *watcher) { - gpr_mu_lock(&watcher->fd->watcher_mu); - watcher->next->prev = watcher->prev; - watcher->prev->next = watcher->next; - gpr_mu_unlock(&watcher->fd->watcher_mu); +void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { + int was_polling = 0; + int kick = 0; + grpc_fd *fd = watcher->fd; + + gpr_mu_lock(&fd->watcher_mu); + if (watcher == fd->read_watcher) { + /* remove read watcher, kick if we still need a read */ + was_polling = 1; + kick = kick || !got_read; + fd->read_watcher = NULL; + } + if (watcher == fd->write_watcher) { + /* remove write watcher, kick if we still need a write */ + was_polling = 1; + kick = kick || !got_write; + fd->write_watcher = NULL; + } + if (!was_polling) { + /* remove from inactive list */ + watcher->next->prev = watcher->prev; + watcher->prev->next = watcher->next; + } + if (kick) { + maybe_wake_one_watcher_locked(fd); + } + gpr_mu_unlock(&fd->watcher_mu); - grpc_fd_unref(watcher->fd); + grpc_fd_unref(fd); } void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { |