diff options
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 179 |
1 files changed, 111 insertions, 68 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index f3e424e83c..82a82cc064 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -47,6 +47,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -56,8 +57,16 @@ GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); +/** Default poll() function - a pointer so that it can be overridden by some + * tests */ grpc_poll_function_type grpc_poll_function = poll; +/** The alarm system needs to be able to wakeup 'some poller' sometimes + * (specifically when a new alarm needs to be triggered earlier than the next + * alarm 'epoch'). + * This wakeup_fd gives us something to alert on when such a case occurs. */ +grpc_wakeup_fd grpc_global_wakeup_fd; + static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next; worker->next->prev = worker->prev; @@ -118,12 +127,20 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); + gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); + grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } void grpc_pollset_global_shutdown(void) { - gpr_tls_destroy(&g_current_thread_poller); + grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); grpc_wakeup_fd_global_destroy(); + gpr_tls_destroy(&g_current_thread_poller); + gpr_tls_destroy(&g_current_thread_worker); +} + +void grpc_kick_poller(void) { + grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } /* main interface */ @@ -136,12 +153,14 @@ void grpc_pollset_init(grpc_pollset *pollset) { pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; + pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; become_basic_pollset(pollset, NULL); } -void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd, 1); + pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -152,9 +171,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd, 1); + pollset->vtable->del_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -165,23 +185,30 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -static void finish_shutdown(grpc_pollset *pollset) { +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); pollset->vtable->finish_shutdown(pollset); - pollset->shutdown_done_cb(pollset->shutdown_done_arg); + grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1); } -void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec now, gpr_timespec deadline) { +void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker *worker, gpr_timespec now, + gpr_timespec deadline) { /* pollset->mu already held */ int added_worker = 0; + int locked = 1; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); - if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { + if (!grpc_pollset_has_workers(pollset) && + !grpc_closure_list_empty(pollset->idle_jobs)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); goto done; } - if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + if (grpc_alarm_check(exec_ctx, now, &deadline)) { + gpr_mu_unlock(&pollset->mu); + locked = 0; goto done; } if (pollset->shutting_down) { @@ -190,19 +217,28 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->in_flight_cbs) { /* Give do_promote priority so we don't starve it out */ gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); + locked = 0; goto done; } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - pollset->vtable->maybe_work(pollset, worker, deadline, now, 1); + gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker); + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline, + now); + locked = 0; gpr_tls_set(&g_current_thread_poller, 0); + gpr_tls_set(&g_current_thread_worker, 0); } else { pollset->kicked_without_pollers = 0; } done: + if (!locked) { + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); + locked = 1; + } grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (added_worker) { remove_worker(pollset, worker); @@ -213,19 +249,24 @@ done: } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(pollset); + finish_shutdown(exec_ctx, pollset); + grpc_exec_ctx_flush(exec_ctx); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); + } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { + gpr_mu_unlock(&pollset->mu); + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->mu); } } } -void grpc_pollset_shutdown(grpc_pollset *pollset, - void (*shutdown_done)(void *arg), - void *shutdown_done_arg) { +void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); @@ -235,13 +276,15 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, pollset->called_shutdown = 1; call_shutdown = 1; } - pollset->shutdown_done_cb = shutdown_done; - pollset->shutdown_done_arg = shutdown_done_arg; + if (!grpc_pollset_has_workers(pollset)) { + grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + } + pollset->shutdown_done = closure; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); if (call_shutdown) { - finish_shutdown(pollset); + finish_shutdown(exec_ctx, pollset); } } @@ -267,7 +310,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, } timeout = gpr_time_sub(deadline, now); return gpr_time_to_millis(gpr_time_add( - timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } /* @@ -279,15 +322,14 @@ typedef struct grpc_unary_promote_args { const grpc_pollset_vtable *original_vtable; grpc_pollset *pollset; grpc_fd *fd; - grpc_iomgr_closure promotion_closure; + grpc_closure promotion_closure; } grpc_unary_promote_args; -static void basic_do_promote(void *args, int success) { +static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; grpc_fd *fd = up_args->fd; - int do_shutdown_cb = 0; /* * This is quite tricky. There are a number of cases to keep in mind here: @@ -300,12 +342,7 @@ static void basic_do_promote(void *args, int success) { gpr_mu_lock(&pollset->mu); /* First we need to ensure that nobody is polling concurrently */ - if (grpc_pollset_has_workers(pollset)) { - grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - grpc_iomgr_add_callback(&up_args->promotion_closure); - gpr_mu_unlock(&pollset->mu); - return; - } + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); gpr_free(up_args); /* At this point the pollset may no longer be a unary poller. In that case @@ -317,21 +354,20 @@ static void basic_do_promote(void *args, int success) { if (pollset->shutting_down) { /* We don't care about this pollset anymore. */ if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { - GPR_ASSERT(!grpc_pollset_has_workers(pollset)); - pollset->called_shutdown = 1; - do_shutdown_cb = 1; + finish_shutdown(exec_ctx, pollset); } } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd, 0); + pollset->vtable->add_fd(exec_ctx, pollset, fd, 0); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; fds[1] = fd; if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(exec_ctx, pollset, fds, + GPR_ARRAY_SIZE(fds)); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -346,16 +382,12 @@ static void basic_do_promote(void *args, int success) { gpr_mu_unlock(&pollset->mu); - if (do_shutdown_cb) { - pollset->shutdown_done_cb(pollset->shutdown_done_arg); - } - /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); } -static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { +static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd, int and_unlock_pollset) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; @@ -372,7 +404,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } else if (!grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(exec_ctx, pollset, fds, + GPR_ARRAY_SIZE(fds)); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -389,13 +422,13 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, GRPC_FD_REF(fd, "basicpoll_add"); pollset->in_flight_cbs++; up_args = gpr_malloc(sizeof(*up_args)); - up_args->pollset = pollset; up_args->fd = fd; up_args->original_vtable = pollset->vtable; + up_args->pollset = pollset; up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_iomgr_add_callback(&up_args->promotion_closure); + grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -404,8 +437,8 @@ exit: } } -static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { +static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd, int and_unlock_pollset) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); @@ -417,11 +450,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } } -static void basic_pollset_maybe_work(grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { - struct pollfd pfd[2]; +static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, + gpr_timespec now) { + struct pollfd pfd[3]; grpc_fd *fd; grpc_fd_watcher fd_watcher; int timeout; @@ -434,31 +468,38 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, fd = pollset->data.ptr = NULL; } timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; - nfds = 1; + pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[1].events = POLLIN; + pfd[1].revents = 0; + nfds = 2; if (fd) { - pfd[1].fd = fd->fd; - pfd[1].revents = 0; + pfd[2].fd = fd->fd; + pfd[2].revents = 0; gpr_mu_unlock(&pollset->mu); - pfd[1].events = + pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher); - if (pfd[1].events != 0) { + if (pfd[2].events != 0) { nfds++; } } else { gpr_mu_unlock(&pollset->mu); } + /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid + even going into the blocking annotation if possible */ /* poll fd count (argument 2) is shortened by one if we have no events to poll on - such that it only includes the kicker */ + GRPC_SCHEDULING_START_BLOCKING_REGION; r = grpc_poll_function(pfd, nfds, timeout); + GRPC_SCHEDULING_END_BLOCKING_REGION; GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r); if (fd) { - grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT); + grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN, + pfd[2].revents & POLLOUT); } if (r < 0) { @@ -469,19 +510,20 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + } + if (pfd[1].revents & POLLIN) { grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - if (nfds > 1) { - if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(fd, allow_synchronous_callback); + if (nfds > 2) { + if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) { + grpc_fd_become_readable(exec_ctx, fd); } - if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) { + grpc_fd_become_writable(exec_ctx, fd); } } } - - gpr_mu_lock(&pollset->mu); } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -492,8 +534,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, - basic_pollset_destroy, basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_del_fd, + basic_pollset_maybe_work_and_unlock, basic_pollset_destroy, + basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; |