aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/fd_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r--src/core/iomgr/fd_posix.c96
1 files changed, 77 insertions, 19 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 9c8133d2d4..63615ea25f 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -96,8 +96,10 @@ static grpc_fd *alloc_fd(int fd) {
gpr_atm_rel_store(&r->writest, NOT_READY);
gpr_atm_rel_store(&r->shutdown, 0);
r->fd = fd;
- r->watcher_root.next = r->watcher_root.prev = &r->watcher_root;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
r->freelist_next = NULL;
+ r->read_watcher = r->write_watcher = NULL;
return r;
}
@@ -147,14 +149,34 @@ int grpc_fd_is_orphaned(grpc_fd *fd) {
return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
}
-static void wake_watchers(grpc_fd *fd) {
- grpc_fd_watcher *watcher;
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset);
+ } else if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ } else if (fd->write_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
+}
+
+static void maybe_wake_one_watcher(grpc_fd *fd) {
gpr_mu_lock(&fd->watcher_mu);
- for (watcher = fd->watcher_root.next; watcher != &fd->watcher_root;
- watcher = watcher->next) {
+ maybe_wake_one_watcher_locked(fd);
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+static void wake_all_watchers(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
grpc_pollset_force_kick(watcher->pollset);
}
- gpr_mu_unlock(&fd->watcher_mu);
+ if (fd->read_watcher) {
+ grpc_pollset_force_kick(fd->read_watcher->pollset);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ grpc_pollset_force_kick(fd->write_watcher->pollset);
+ }
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
@@ -162,7 +184,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
fd->on_done_user_data = user_data;
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
- wake_watchers(fd);
+ wake_all_watchers(fd);
unref_by(fd, 2); /* drop the reference */
}
@@ -204,7 +226,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
set_ready call. NOTE: we don't have an ABA problem here,
since we should never have concurrent calls to the same
notify_on function. */
- wake_watchers(fd);
+ maybe_wake_one_watcher(fd);
return;
}
/* swap was unsuccessful due to an intervening set_ready call.
@@ -290,29 +312,65 @@ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
gpr_uint32 read_mask, gpr_uint32 write_mask,
grpc_fd_watcher *watcher) {
+ gpr_uint32 mask = 0;
/* keep track of pollers that have requested our events, in case they change
*/
grpc_fd_ref(fd);
gpr_mu_lock(&fd->watcher_mu);
- watcher->next = &fd->watcher_root;
- watcher->prev = watcher->next->prev;
- watcher->next->prev = watcher->prev->next = watcher;
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
watcher->pollset = pollset;
watcher->fd = fd;
gpr_mu_unlock(&fd->watcher_mu);
- return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) |
- (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0);
+ return mask;
}
-void grpc_fd_end_poll(grpc_fd_watcher *watcher) {
- gpr_mu_lock(&watcher->fd->watcher_mu);
- watcher->next->prev = watcher->prev;
- watcher->prev->next = watcher->next;
- gpr_mu_unlock(&watcher->fd->watcher_mu);
+void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ gpr_mu_lock(&fd->watcher_mu);
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ kick = kick || !got_read;
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ kick = kick || !got_write;
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
- grpc_fd_unref(watcher->fd);
+ grpc_fd_unref(fd);
}
void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {