diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-10-01 07:53:56 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-10-01 07:53:56 -0700 |
commit | 988e37f1fc8542c205db569be0dd20f758c39164 (patch) | |
tree | 6278c00bdc22fb0c565c15c867ed5adce199917c /src/core | |
parent | 2b2a1ad6ca97da100fd469085f0ffef847e87e65 (diff) |
Allow fd_posix to force a re-evaluation of polling on wakeup
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 20 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 30 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 5 |
4 files changed, 46 insertions, 11 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index b48b7f050a..806af9bc26 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -173,19 +173,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_pollset *pollset) { - gpr_mu_lock(GRPC_POLLSET_MU(pollset)); - grpc_pollset_kick(pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(pollset)); +static void pollset_kick_locked(grpc_fd_watcher *watcher) { + gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + grpc_pollset_kick_ex(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(fd->inactive_watcher_root.next->pollset); + pollset_kick_locked(fd->inactive_watcher_root.next); } else if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher); } else if (fd->write_watcher) { - pollset_kick_locked(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher); } } @@ -199,13 +199,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(watcher->pollset); + pollset_kick_locked(watcher); } if (fd->read_watcher) { - pollset_kick_locked(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher); } } diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 089aa4d717..a60aff2a09 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -46,6 +46,7 @@ typedef struct grpc_fd_watcher { struct grpc_fd_watcher *next; struct grpc_fd_watcher *prev; grpc_pollset *pollset; + grpc_pollset_worker *worker; grpc_fd *fd; } grpc_fd_watcher; @@ -126,6 +127,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, fd's current interest (such as epoll) do not need to call this function. MUST NOT be called with a pollset lock taken */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + grpc_pollset_worker *worker, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 82a82cc064..c88fff6b43 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -98,31 +98,59 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags) { /* pollset->mu already held */ if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); } p->kicked_without_pollers = 1; + return; } else if (gpr_tls_get(&g_current_thread_worker) != (gpr_intptr)specific_worker) { + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; + } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { + if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { + specific_worker->reevaluate_polling_on_wakeup = 1; + } grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; } } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { + GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); specific_worker = pop_front_worker(p); if (specific_worker != NULL) { + if (gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + push_back_worker(p, specific_worker); + specific_worker = pop_front_worker(p); + if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 && + gpr_tls_get(&g_current_thread_worker) == (gpr_intptr)specific_worker) { + push_back_worker(p, specific_worker); + return; + } + } push_back_worker(p, specific_worker); grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + return; } else { p->kicked_without_pollers = 1; + return; } } } +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + grpc_pollset_kick_ex(p, specific_worker, 0); +} + /* global state management */ void grpc_pollset_global_init(void) { diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 83c5258539..762582c79d 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -50,6 +50,7 @@ struct grpc_fd; typedef struct grpc_pollset_worker { grpc_wakeup_fd wakeup_fd; + int reevaluate_polling_on_wakeup; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; } grpc_pollset_worker; @@ -111,6 +112,10 @@ void grpc_kick_drain(grpc_pollset *p); int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); +#define GRPC_POLLSET_CAN_KICK_SELF 1 +#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 +void grpc_pollset_kick_ex(grpc_pollset *p, grpc_pollset_worker *specific_worker, gpr_uint32 flags); + /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, |