diff options
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 138 |
1 files changed, 62 insertions, 76 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 0fe3f80d44..885cb29234 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -136,17 +136,14 @@ void grpc_pollset_init(grpc_pollset *pollset) { pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; - pollset->idle_jobs = NULL; - pollset->unlock_jobs = NULL; + pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; become_basic_pollset(pollset, NULL); } -void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - if (fd->workqueue->wakeup_read_fd != fd) { - grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd); - } +void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + grpc_call_list *call_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd, 1); + pollset->vtable->add_fd(pollset, fd, 1, call_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -157,9 +154,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + grpc_call_list *call_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd, 1); + pollset->vtable->del_fd(pollset, fd, 1, call_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -170,53 +168,27 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -static void finish_shutdown(grpc_pollset *pollset) { +static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) { pollset->vtable->finish_shutdown(pollset); - pollset->shutdown_done_cb(pollset->shutdown_done_arg); -} - -static void run_jobs(grpc_pollset *pollset, grpc_closure **root) { - grpc_closure *exec = *root; - *root = NULL; - gpr_mu_unlock(&pollset->mu); - while (exec != NULL) { - grpc_closure *next = exec->next; - exec->cb(exec->cb_arg, 1); - exec = next; - } - gpr_mu_lock(&pollset->mu); -} - -static void add_job(grpc_closure **root, grpc_closure *closure) { - closure->next = *root; - *root = closure; -} - -void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) { - add_job(&pollset->idle_jobs, closure); -} - -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) { - add_job(&pollset->unlock_jobs, closure); + grpc_call_list_add(call_list, pollset->shutdown_done, 1); } void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ int added_worker = 0; + int locked = 1; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); - if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) { - run_jobs(pollset, &pollset->idle_jobs); + if (!grpc_pollset_has_workers(pollset) && + !grpc_call_list_empty(pollset->idle_jobs)) { + grpc_call_list_move(&pollset->idle_jobs, &call_list); goto done; } - if (pollset->unlock_jobs != NULL) { - run_jobs(pollset, &pollset->unlock_jobs); - goto done; - } - if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + if (grpc_alarm_check(now, &deadline, &call_list)) { goto done; } if (pollset->shutting_down) { @@ -225,19 +197,32 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->in_flight_cbs) { /* Give do_promote priority so we don't starve it out */ gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); + locked = 0; goto done; } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - pollset->vtable->maybe_work(pollset, worker, deadline, now, 1); + pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now, + NULL); + locked = 0; gpr_tls_set(&g_current_thread_poller, 0); } else { pollset->kicked_without_pollers = 0; } done: + if (!grpc_call_list_empty(call_list)) { + if (locked) { + gpr_mu_unlock(&pollset->mu); + locked = 0; + } + grpc_call_list_run(&call_list); + } + if (!locked) { + gpr_mu_lock(&pollset->mu); + locked = 1; + } grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (added_worker) { remove_worker(pollset, worker); @@ -248,7 +233,8 @@ done: } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(pollset); + finish_shutdown(pollset, &call_list); + grpc_call_list_run(&call_list); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. @@ -258,9 +244,8 @@ done: } } -void grpc_pollset_shutdown(grpc_pollset *pollset, - void (*shutdown_done)(void *arg), - void *shutdown_done_arg) { +void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, + grpc_call_list *call_list) { int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); @@ -270,13 +255,12 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, pollset->called_shutdown = 1; call_shutdown = 1; } - pollset->shutdown_done_cb = shutdown_done; - pollset->shutdown_done_arg = shutdown_done_arg; + pollset->shutdown_done = closure; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); if (call_shutdown) { - finish_shutdown(pollset); + finish_shutdown(pollset, call_list); } } @@ -317,12 +301,12 @@ typedef struct grpc_unary_promote_args { grpc_closure promotion_closure; } grpc_unary_promote_args; -static void basic_do_promote(void *args, int success) { +static void basic_do_promote(void *args, int success, + grpc_call_list *call_list) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; grpc_fd *fd = up_args->fd; - int do_shutdown_cb = 0; /* * This is quite tricky. There are a number of cases to keep in mind here: @@ -349,19 +333,20 @@ static void basic_do_promote(void *args, int success) { if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->called_shutdown = 1; - do_shutdown_cb = 1; + grpc_call_list_add(call_list, pollset->shutdown_done, 1); } } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd, 0); + pollset->vtable->add_fd(pollset, fd, 0, call_list); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; fds[1] = fd; if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), + call_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -376,16 +361,15 @@ static void basic_do_promote(void *args, int success) { gpr_mu_unlock(&pollset->mu); - if (do_shutdown_cb) { - pollset->shutdown_done_cb(pollset->shutdown_done_arg); - } - /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); + + grpc_call_list_run(call_list); } static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; @@ -402,7 +386,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } else if (!grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), + call_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -424,7 +409,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure); + grpc_call_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -434,7 +419,8 @@ exit: } static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); @@ -446,10 +432,11 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } } -static void basic_pollset_maybe_work(grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { +static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, + gpr_timespec now, + grpc_call_list *call_list) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; @@ -487,7 +474,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, if (fd) { grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT); + pfd[1].revents & POLLOUT, call_list); } if (r < 0) { @@ -502,15 +489,13 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, } if (nfds > 1) { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(fd, allow_synchronous_callback); + grpc_fd_become_readable(fd, call_list); } if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + grpc_fd_become_writable(fd, call_list); } } } - - gpr_mu_lock(&pollset->mu); } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -521,8 +506,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, - basic_pollset_destroy, basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_del_fd, + basic_pollset_maybe_work_and_unlock, basic_pollset_destroy, + basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; |