diff options
Diffstat (limited to 'src/core/lib')
22 files changed, 487 insertions, 131 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 66ba601adb..b89b8af15a 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; static int g_epfd; @@ -77,8 +78,21 @@ static void fd_global_shutdown(void); typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; +static const char *kick_state_string(kick_state st) { + switch (st) { + case UNKICKED: + return "UNKICKED"; + case KICKED: + return "KICKED"; + case DESIGNATED_POLLER: + return "DESIGNATED_POLLER"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + struct grpc_pollset_worker { kick_state kick_state; + int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; grpc_pollset_worker *prev; @@ -86,6 +100,12 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; +#define SET_KICK_STATE(worker, state) \ + do { \ + (worker)->kick_state = (state); \ + (worker)->kick_state_mutator = __LINE__; \ + } while (false) + #define MAX_NEIGHBOURHOODS 1024 typedef struct pollset_neighbourhood { @@ -100,10 +120,15 @@ struct grpc_pollset { bool reassigning_neighbourhood; grpc_pollset_worker *root_worker; bool kicked_without_poller; + + /* Set to true if the pollset is observed to have no workers available to + * poll */ bool seen_inactive; - bool shutting_down; /* Is the pollset shutting down ? */ - bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ + bool shutting_down; /* Is the pollset shutting down ? */ grpc_closure *shutdown_closure; /* Called after after shutdown is complete */ + + /* Number of workers who are *about-to* attach themselves to the pollset + * worker list */ int begin_refs; grpc_pollset *next; @@ -263,29 +288,23 @@ static bool fd_is_shutdown(grpc_fd *fd) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); - - /* Note, it is possible that fd_become_readable might be called twice with - different 'notifier's when an fd becomes readable and it is in two epoll - sets (This can happen briefly during polling island merges). In such cases - it does not really matter which notifer is set as the read_notifier_pollset - (They would both point to the same polling island anyway) */ + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Use release store to match with acquire load in fd_get_read_notifier */ gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } /******************************************************************************* @@ -410,18 +429,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - if (worker->initialized_cv) { - worker->kick_state = KICKED; - gpr_cv_signal(&worker->cv); - } else { - worker->kick_state = KICKED; - append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd), - "pollset_shutdown"); + switch (worker->kick_state) { + case KICKED: + break; + case UNKICKED: + SET_KICK_STATE(worker, KICKED); + if (worker->initialized_cv) { + gpr_cv_signal(&worker->cv); + } + break; + case DESIGNATED_POLLER: + SET_KICK_STATE(worker, KICKED); + append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd), + "pollset_kick_all"); + break; } worker = worker->next; } while (worker != pollset->root_worker); } + // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here + // in the else case + return error; } @@ -437,7 +466,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); + GPR_ASSERT(!pollset->shutting_down); pollset->shutdown_closure = closure; + pollset->shutting_down = true; GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -510,10 +541,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) { if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; - worker->kick_state = UNKICKED; + SET_KICK_STATE(worker, UNKICKED); worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; pollset->begin_refs++; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker); + } + if (pollset->seen_inactive) { // pollset has been observed to be inactive, we need to move back to the // active list @@ -529,6 +564,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, retry_lock_neighbourhood: gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&pollset->mu); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", + pollset, worker, kick_state_string(worker->kick_state), + is_reassigning); + } if (pollset->seen_inactive) { if (neighbourhood != pollset->neighbourhood) { gpr_mu_unlock(&neighbourhood->mu); @@ -539,8 +579,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->seen_inactive = false; if (neighbourhood->active_root == NULL) { neighbourhood->active_root = pollset->next = pollset->prev = pollset; - if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { - worker->kick_state = DESIGNATED_POLLER; + /* TODO: sreek. Why would this worker state be other than UNKICKED + * here ? (since the worker isn't added to the pollset yet, there is no + * way it can be "found" by other threads to get kicked). */ + + /* If there is no designated poller, make this the designated poller */ + if (worker->kick_state == UNKICKED && + gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { + SET_KICK_STATE(worker, DESIGNATED_POLLER); } } else { pollset->next = neighbourhood->active_root; @@ -554,24 +600,53 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } gpr_mu_unlock(&neighbourhood->mu); } + worker_insert(pollset, worker); pollset->begin_refs--; - if (worker->kick_state == UNKICKED) { + if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && - pollset->shutdown_closure == NULL) { + while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", + pollset, worker, kick_state_string(worker->kick_state), + pollset->shutting_down); + } + if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && worker->kick_state == UNKICKED) { - worker->kick_state = KICKED; + /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker + received a kick */ + SET_KICK_STATE(worker, KICKED); } } *now = gpr_now(now->clock_type); } - return worker->kick_state == DESIGNATED_POLLER && - pollset->shutdown_closure == NULL; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, + "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " + "kicked_without_poller: %d", + pollset, worker, kick_state_string(worker->kick_state), + pollset->shutting_down, pollset->kicked_without_poller); + } + + /* We release pollset lock in this function at a couple of places: + * 1. Briefly when assigning pollset to a neighbourhood + * 2. When doing gpr_cv_wait() + * It is possible that 'kicked_without_poller' was set to true during (1) and + * 'shutting_down' is set to true during (1) or (2). If either of them is + * true, this worker cannot do polling */ + /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller + * case; especially when the worker is the DESIGNATED_POLLER */ + + if (pollset->kicked_without_poller) { + pollset->kicked_without_poller = false; + return false; + } + + return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighbourhood_for_available_poller( @@ -591,10 +666,18 @@ static bool check_neighbourhood_for_available_poller( case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { - inspect_worker->kick_state = DESIGNATED_POLLER; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. choose next poller to be %p", + inspect_worker); + } + SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER); if (inspect_worker->initialized_cv) { gpr_cv_signal(&inspect_worker->cv); } + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. beaten to choose next poller"); + } } // even if we didn't win the cas, there's a worker, we can stop found_worker = true; @@ -607,9 +690,12 @@ static bool check_neighbourhood_for_available_poller( break; } inspect_worker = inspect_worker->next; - } while (inspect_worker != inspect->root_worker); + } while (!found_worker && inspect_worker != inspect->root_worker); } if (!found_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); + } inspect->seen_inactive = true; if (inspect == neighbourhood->active_root) { neighbourhood->active_root = @@ -627,15 +713,22 @@ static bool check_neighbourhood_for_available_poller( static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker); + } if (worker_hdl != NULL) *worker_hdl = NULL; - worker->kick_state = KICKED; + /* Make sure we appear kicked */ + SET_KICK_STATE(worker, KICKED); grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); + } GPR_ASSERT(worker->next->initialized_cv); gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next); - worker->next->kick_state = DESIGNATED_POLLER; + SET_KICK_STATE(worker->next, DESIGNATED_POLLER); gpr_cv_signal(&worker->next->cv); if (grpc_exec_ctx_has_work(exec_ctx)) { gpr_mu_unlock(&pollset->mu); @@ -644,9 +737,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { gpr_atm_no_barrier_store(&g_active_poller, 0); - gpr_mu_unlock(&pollset->mu); size_t poller_neighbourhood_idx = (size_t)(pollset->neighbourhood - g_neighbourhoods); + gpr_mu_unlock(&pollset->mu); bool found_worker = false; bool scan_state[MAX_NEIGHBOURHOODS]; for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { @@ -682,6 +775,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. remove worker"); + } if (EMPTIED == worker_remove(pollset, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -702,16 +798,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutdown_closure); + GPR_ASSERT(!pollset->shutting_down); GPR_ASSERT(!pollset->seen_inactive); gpr_mu_unlock(&pollset->mu); append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), err_desc); gpr_mu_lock(&pollset->mu); gpr_tls_set(&g_current_thread_worker, 0); + } else { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); gpr_tls_set(&g_current_thread_pollset, 0); @@ -720,46 +818,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_strvec log; + gpr_strvec_init(&log); + char *tmp; + gpr_asprintf( + &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, + specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker); + gpr_strvec_add(&log, tmp); + if (pollset->root_worker != NULL) { + gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", + kick_state_string(pollset->root_worker->kick_state), + pollset->root_worker->next, + kick_state_string(pollset->root_worker->next->kick_state)); + gpr_strvec_add(&log, tmp); + } + if (specific_worker != NULL) { + gpr_asprintf(&tmp, " worker_kick_state=%s", + kick_state_string(specific_worker->kick_state)); + gpr_strvec_add(&log, tmp); + } + tmp = gpr_strvec_flatten(&log, NULL); + gpr_strvec_destroy(&log); + gpr_log(GPR_ERROR, "%s", tmp); + gpr_free(tmp); + } if (specific_worker == NULL) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { grpc_pollset_worker *root_worker = pollset->root_worker; if (root_worker == NULL) { pollset->kicked_without_poller = true; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked_without_poller"); + } return GRPC_ERROR_NONE; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker == next_worker && - root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( - &g_active_poller)) { - root_worker->kick_state = KICKED; + if (root_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); + } + SET_KICK_STATE(root_worker, KICKED); + return GRPC_ERROR_NONE; + } else if (next_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); + } + SET_KICK_STATE(next_worker, KICKED); + return GRPC_ERROR_NONE; + } else if (root_worker == + next_worker && // only try and wake up a poller if + // there is no next worker + root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( + &g_active_poller)) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked %p", root_worker); + } + SET_KICK_STATE(root_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (next_worker->kick_state == UNKICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked %p", next_worker); + } GPR_ASSERT(next_worker->initialized_cv); - next_worker->kick_state = KICKED; + SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); return GRPC_ERROR_NONE; + } else if (next_worker->kick_state == DESIGNATED_POLLER) { + if (root_worker->kick_state != DESIGNATED_POLLER) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log( + GPR_ERROR, + " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)", + root_worker, root_worker->initialized_cv, next_worker); + } + SET_KICK_STATE(root_worker, KICKED); + if (root_worker->initialized_cv) { + gpr_cv_signal(&root_worker->cv); + } + return GRPC_ERROR_NONE; + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker, + root_worker); + } + SET_KICK_STATE(next_worker, KICKED); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); + } } else { + GPR_ASSERT(next_worker->kick_state == KICKED); + SET_KICK_STATE(next_worker, KICKED); return GRPC_ERROR_NONE; } } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked while waking up"); + } return GRPC_ERROR_NONE; } } else if (specific_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. specific worker already kicked"); + } return GRPC_ERROR_NONE; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker); + } + SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } else if (specific_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick active poller"); + } + SET_KICK_STATE(specific_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (specific_worker->initialized_cv) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick waiting worker"); + } + SET_KICK_STATE(specific_worker, KICKED); gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } else { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick non-waiting worker"); + } + SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } } @@ -805,6 +993,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, static void shutdown_engine(void) { fd_global_shutdown(); pollset_global_shutdown(); + close(g_epfd); } static const grpc_event_engine_vtable vtable = { @@ -841,9 +1030,6 @@ static const grpc_event_engine_vtable vtable = { /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create a dummy epoll_fd to make sure epoll support is available */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { - /* TODO(ctiller): temporary, until this stabilizes */ - if (!explicit_request) return NULL; - if (!grpc_has_wakeup_fd()) { return NULL; } @@ -862,6 +1048,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { return NULL; } + gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd); + return &vtable; } diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 27b4892d1d..5166dc2ac2 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -1007,12 +1007,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -1223,7 +1223,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -1235,7 +1235,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index 49be72c03e..1058f69a83 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5690431759..a2fa4ad9a5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -438,12 +438,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -710,7 +710,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -722,7 +722,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { @@ -1049,8 +1049,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, /* Introduce a spurious completion. If we do not, then it may be that the fd-specific epoll set consumed a completion without being polled, leading to a missed edge going up. */ - grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure); - grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); + grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 0d969dccce..a84c208e1a 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -933,12 +933,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -1115,7 +1115,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -1127,7 +1127,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, @@ -1731,7 +1731,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux( if (!is_grpc_wakeup_signal_initialized) { /* TODO(ctiller): when other epoll engines are ready, remove the true || to * force this to be explitly chosen if needed */ - if (true || explicit_request) { + if (explicit_request) { grpc_use_signal(SIGRTMIN + 6); } else { return NULL; diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 8b1245c640..df5d23af3b 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -21,12 +21,20 @@ #ifdef GRPC_UV #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" +gpr_thd_id g_init_thread; + void grpc_iomgr_platform_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer(&grpc_tcp_trace); + grpc_executor_set_threading(&exec_ctx, false); + g_init_thread = gpr_thd_currentid(); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h new file mode 100644 index 0000000000..3b4daaa73b --- /dev/null +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H +#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H + +#include "src/core/lib/iomgr/iomgr_internal.h" + +#include <grpc/support/thd.h> + +/* The thread ID of the thread on which grpc was initialized. Used to verify + * that all calls into libuv are made on that same thread */ +extern gpr_thd_id g_init_thread; + +#ifdef GRPC_UV_THREAD_CHECK +#define GRPC_UV_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == g_init_thread) +#else +#define GRPC_UV_ASSERT_SAME_THREAD() +#endif /* GRPC_UV_THREAD_CHECK */ + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c index c2ceecb3c5..f967b22ba9 100644 --- a/src/core/lib/iomgr/lockfree_event.c +++ b/src/core/lib/iomgr/lockfree_event.c @@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) { } void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, - grpc_closure *closure) { + grpc_closure *closure, const char *variable) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state, - (void *)curr, closure); + gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable, + state, (void *)curr, closure); } switch (curr) { case CLOSURE_NOT_READY: { @@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state, + gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state, (void *)curr, grpc_error_string(shutdown_err)); } switch (curr) { @@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, GPR_UNREACHABLE_CODE(return false); } -void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) { +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, + const char *variable) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr); + gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state, + (void *)curr); } switch (curr) { diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index ef3844a07a..6a14a0f3b2 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state); bool grpc_lfev_is_shutdown(gpr_atm *state); void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, - grpc_closure *closure); + grpc_closure *closure, const char *variable); /* Returns true on first successful shutdown */ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, grpc_error *shutdown_err); -void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state); +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, + const char *variable); #endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 946f0c8c60..a79fe89d3e 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -28,6 +28,7 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_uv.h" @@ -70,6 +71,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -79,6 +81,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + GRPC_UV_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -87,6 +90,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); + GRPC_UV_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -99,6 +103,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GRPC_UV_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -113,6 +118,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -141,6 +147,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + GRPC_UV_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index a98b8e62db..2d438e8b48 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -114,11 +115,14 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; int retry_status; + char *port = r->port; gpr_free(req); retry_status = retry_named_port_failure(status, r, getaddrinfo_callback); if (retry_status == 0) { - // The request is being retried. Nothing should be done here + /* The request is being retried. It is using its own port string, so we free + * the original one */ + gpr_free(port); return; } /* Either no retry was attempted, or the retry failed. Either way, the @@ -171,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; + GRPC_UV_ASSERT_SAME_THREAD(); + req.addrinfo = NULL; err = try_split_host_port(name, default_port, &host, &port); @@ -218,16 +224,19 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { - uv_getaddrinfo_t *req; - request *r; - struct addrinfo *hints; - char *host; - char *port; + uv_getaddrinfo_t *req = NULL; + request *r = NULL; + struct addrinfo *hints = NULL; + char *host = NULL; + char *port = NULL; grpc_error *err; int s; + GRPC_UV_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); + gpr_free(host); + gpr_free(port); return; } r = gpr_malloc(sizeof(request)); diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 99dc2f1c78..3f4145d104 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme( return NULL; } +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + return addr->sa_family; +} + int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 7692b969f2..a589a19705 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); /* Returns the URI scheme corresponding to \a addr */ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 2f1d237c07..786c456b73 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -26,6 +26,7 @@ #include <grpc/support/log.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; + GRPC_UV_ASSERT_SAME_THREAD(); + if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34..3b9332321f 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -20,6 +20,7 @@ #ifdef GRPC_UV +#include <assert.h> #include <string.h> #include <grpc/support/alloc.h> @@ -27,6 +28,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_server.h" @@ -43,6 +45,8 @@ struct grpc_tcp_listener { struct grpc_tcp_listener *next; bool closed; + + bool has_pending_connection; }; struct grpc_tcp_server { @@ -104,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + GRPC_UV_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -168,6 +173,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + GRPC_UV_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -183,18 +189,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void accepted_connection_close_cb(uv_handle_t *handle) { - gpr_free(handle); -} - -static void on_connect(uv_stream_t *server, int status) { - grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; +static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); uv_tcp_t *client; grpc_endpoint *ep = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address peer_name; char *peer_name_string; int err; + uv_tcp_t *server = sp->handle; + + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (peer_name_string) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", + sp->server, peer_name_string); + } else { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server); + } + } + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + acceptor); + gpr_free(peer_name_string); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (status < 0) { switch (status) { @@ -207,35 +244,19 @@ static void on_connect(uv_stream_t *server, int status) { } } - client = gpr_malloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), client); - // UV documentation says this is guaranteed to succeed - uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - // If the server has not been started, we discard incoming connections - if (sp->server->on_accept_cb == NULL) { - uv_close((uv_handle_t *)client, accepted_connection_close_cb); + GPR_ASSERT(!sp->has_pending_connection); + + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + } + + // Create acceptor. + if (sp->server->on_accept_cb) { + finish_accept(&exec_ctx, sp); } else { - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); - } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); - } - ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); - // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - acceptor->from_server = sp->server; - acceptor->port_index = sp->port_index; - acceptor->fd_index = 0; - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - acceptor); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(peer_name_string); + sp->has_pending_connection = true; } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -282,7 +303,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -316,6 +337,9 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, unsigned port_index = 0; int status; grpc_error *error = GRPC_ERROR_NONE; + int family; + + GRPC_UV_ASSERT_SAME_THREAD(); if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -353,7 +377,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - status = uv_tcp_init(uv_default_loop(), handle); + + family = grpc_sockaddr_get_family(addr); + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t *)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { @@ -366,6 +401,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(allocated_addr); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + char *port_string; + grpc_sockaddr_to_string(&port_string, addr, 0); + const char *str = grpc_error_string(error); + if (port_string) { + gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str); + gpr_free(port_string); + } else { + gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str); + } + } + if (error != GRPC_ERROR_NONE) { grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); @@ -385,13 +432,19 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + GRPC_UV_ASSERT_SAME_THREAD(); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_START %p", server); + } GPR_ASSERT(on_accept_cb); GPR_ASSERT(!server->on_accept_cb); server->on_accept_cb = on_accept_cb; server->on_accept_cb_arg = cb_arg; for (sp = server->head; sp; sp = sp->next) { - GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == - 0); + if (sp->has_pending_connection) { + finish_accept(exec_ctx, sp); + sp->has_pending_connection = false; + } } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7c6a9b85f5..a05c19b4ac 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -30,6 +30,7 @@ #include <grpc/support/string_util.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; @@ -307,6 +310,10 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(why); + gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); + } tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 1ab82ef1d5..70f49bcbe8 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -24,6 +24,7 @@ #include <grpc/support/log.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/timer.h" #include <uv.h> @@ -43,6 +44,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -55,6 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; + GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -75,6 +78,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h index 18bc08ac62..e2c012a728 100644 --- a/src/core/lib/support/env.h +++ b/src/core/lib/support/env.h @@ -36,6 +36,12 @@ char *gpr_getenv(const char *name); /* Sets the the environment with the specified name to the specified value. */ void gpr_setenv(const char *name, const char *value); +/* This is a version of gpr_getenv that does not produce any output if it has to + use an insecure version of the function. It is ONLY to be used to solve the + problem in which we need to check an env variable to configure the verbosity + level of logging. So DO NOT USE THIS. */ +const char *gpr_getenv_silent(const char *name, char **dst); + #ifdef __cplusplus } #endif diff --git a/src/core/lib/support/env_linux.c b/src/core/lib/support/env_linux.c index 0c79a2c401..4c45a977ca 100644 --- a/src/core/lib/support/env_linux.c +++ b/src/core/lib/support/env_linux.c @@ -38,7 +38,9 @@ #include "src/core/lib/support/string.h" -char *gpr_getenv(const char *name) { +const char *gpr_getenv_silent(const char *name, char **dst) { + const char *insecure_func_used = NULL; + char *result = NULL; #if defined(GPR_BACKWARDS_COMPATIBILITY_MODE) typedef char *(*getenv_type)(const char *); static getenv_type getenv_func = NULL; @@ -48,22 +50,28 @@ char *gpr_getenv(const char *name) { for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) { getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]); if (getenv_func != NULL && strstr(names[i], "secure") == NULL) { - gpr_log(GPR_DEBUG, - "Warning: insecure environment read function '%s' used", - names[i]); + insecure_func_used = names[i]; } } - char *result = getenv_func(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv_func(name); #elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17) - char *result = secure_getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = secure_getenv(name); #else - gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", - "getenv"); - char *result = getenv(name); - return result == NULL ? result : gpr_strdup(result); + result = getenv(name); + insecure_func_used = "getenv"; #endif + *dst = result == NULL ? result : gpr_strdup(result); + return insecure_func_used; +} + +char *gpr_getenv(const char *name) { + char *result = NULL; + const char *insecure_func_used = gpr_getenv_silent(name, &result); + if (insecure_func_used != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_func_used); + } + return result; } void gpr_setenv(const char *name, const char *value) { diff --git a/src/core/lib/support/env_posix.c b/src/core/lib/support/env_posix.c index bdbc4da95a..b88822ca02 100644 --- a/src/core/lib/support/env_posix.c +++ b/src/core/lib/support/env_posix.c @@ -29,6 +29,11 @@ #include <grpc/support/string_util.h> #include "src/core/lib/support/string.h" +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = getenv(name); return result == NULL ? result : gpr_strdup(result); diff --git a/src/core/lib/support/env_windows.c b/src/core/lib/support/env_windows.c index c1d557e219..652eeb61c6 100644 --- a/src/core/lib/support/env_windows.c +++ b/src/core/lib/support/env_windows.c @@ -30,6 +30,11 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +const char *gpr_getenv_silent(const char *name, char **dst) { + *dst = gpr_getenv(name); + return NULL; +} + char *gpr_getenv(const char *name) { char *result = NULL; DWORD size; diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c index bcc336b8ae..fadb4d9a2c 100644 --- a/src/core/lib/support/log.c +++ b/src/core/lib/support/log.c @@ -64,7 +64,8 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) { } void gpr_log_verbosity_init() { - char *verbosity = gpr_getenv("GRPC_VERBOSITY"); + char *verbosity = NULL; + const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity); gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR; if (verbosity != NULL) { @@ -81,6 +82,11 @@ void gpr_log_verbosity_init() { GPR_LOG_VERBOSITY_UNSET) { gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print); } + + if (insecure_getenv != NULL) { + gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used", + insecure_getenv); + } } void gpr_set_log_function(gpr_log_func f) { |