diff options
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 145 |
1 files changed, 92 insertions, 53 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c8646af615..d3a9193af1 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -55,22 +55,60 @@ #include <grpc/support/useful.h> GPR_TLS_DECL(g_current_thread_poller); +GPR_TLS_DECL(g_current_thread_worker); -void grpc_pollset_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) { - p->vtable->kick(p); - } +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +int grpc_pollset_has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; } -void grpc_pollset_force_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { - grpc_pollset_kick_kick(&p->kick_state); +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (grpc_pollset_has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } else { + return NULL; } } -static void kick_using_pollset_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { - grpc_pollset_kick_kick(&p->kick_state); +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } + p->kicked_without_pollers = 1; + } else if (gpr_tls_get(&g_current_thread_worker) != + (gpr_intptr)specific_worker) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } + } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } else { + p->kicked_without_pollers = 1; + } } } @@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) { void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); - - /* Initialize kick fd state */ - grpc_pollset_kick_global_init(); + grpc_wakeup_fd_global_init(); } void grpc_pollset_global_shutdown(void) { - /* destroy the kick pipes */ - grpc_pollset_kick_global_destroy(); - gpr_tls_destroy(&g_current_thread_poller); + grpc_wakeup_fd_global_destroy(); } /* main interface */ @@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); - grpc_pollset_kick_init(&pollset->kick_state); + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; @@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { +int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline) { /* pollset->mu already held */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + int added_worker = 0; if (gpr_time_cmp(now, deadline) > 0) { return 0; } + /* this must happen before we (potentially) drop pollset->mu */ + worker->next = worker->prev = NULL; + /* TODO(ctiller): pool these */ + grpc_wakeup_fd_init(&worker->wakeup_fd); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { - return 1; + goto done; } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { - return 1; + goto done; } if (pollset->shutting_down) { - return 1; + goto done; + } + if (!pollset->kicked_without_pollers) { + push_front_worker(pollset, worker); + added_worker = 1; + gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); + pollset->vtable->maybe_work(pollset, worker, deadline, now, 1); + gpr_tls_set(&g_current_thread_poller, 0); + } else { + pollset->kicked_without_pollers = 0; + } +done: + grpc_wakeup_fd_destroy(&worker->wakeup_fd); + if (added_worker) { + remove_worker(pollset, worker); } - gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - pollset->vtable->maybe_work(pollset, deadline, now, 1); - gpr_tls_set(&g_current_thread_poller, 0); if (pollset->shutting_down) { - if (pollset->counter > 0) { - grpc_pollset_kick(pollset); + if (grpc_pollset_has_workers(pollset)) { + grpc_pollset_kick(pollset, NULL); } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); @@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - pollset->counter == 0) { + !grpc_pollset_has_workers(pollset)) { pollset->called_shutdown = 1; call_shutdown = 1; } pollset->shutdown_done_cb = shutdown_done; pollset->shutdown_done_arg = shutdown_done_arg; - if (pollset->counter > 0) { - grpc_pollset_kick(pollset); - } + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); if (call_shutdown) { @@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(pollset->shutting_down); GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->vtable->destroy(pollset); - grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); } @@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) { gpr_mu_lock(&pollset->mu); /* First we need to ensure that nobody is polling concurrently */ - if (pollset->counter != 0) { - grpc_pollset_kick(pollset); + if (grpc_pollset_has_workers(pollset)) { + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); grpc_iomgr_add_callback(&up_args->promotion_closure); gpr_mu_unlock(&pollset->mu); return; @@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) { pollset->in_flight_cbs--; if (pollset->shutting_down) { /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) { + if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->called_shutdown = 1; do_shutdown_cb = 1; } @@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; - if (!pollset->counter) { + if (!grpc_pollset_has_workers(pollset)) { /* Fast path -- no in flight cbs */ /* TODO(klempner): Comment this out and fix any test failures or establish * they are due to timing issues */ @@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->promotion_closure.cb_arg = up_args; grpc_iomgr_add_callback(&up_args->promotion_closure); - grpc_pollset_kick(pollset); + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: if (and_unlock_pollset) { @@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } static void basic_pollset_maybe_work(grpc_pollset *pollset, + grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; - grpc_kick_fd_info *kfd; int timeout; int r; int nfds; @@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, fd = pollset->data.ptr = NULL; } timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); - if (kfd == NULL) { - /* Already kicked */ - return; - } - pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; nfds = 1; - pollset->counter++; if (fd) { pfd[1].fd = fd->fd; pfd[1].revents = 0; @@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { - grpc_pollset_kick_consume(&pollset->kick_state, kfd); + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } if (nfds > 1) { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { @@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, } } - grpc_pollset_kick_post_poll(&pollset->kick_state, kfd); - gpr_mu_lock(&pollset->mu); - pollset->counter--; } static void basic_pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->counter == 0); if (pollset->data.ptr != NULL) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; @@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, - kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, + basic_pollset_destroy, basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; - pollset->counter = 0; pollset->data.ptr = fd_or_null; - if (fd_or_null) { + if (fd_or_null != NULL) { GRPC_FD_REF(fd_or_null, "basicpoll"); } } |