diff options
-rw-r--r-- | build.yaml | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 235 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_thread_pool_linux.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.c | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/lockfree_event.c | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/lockfree_event.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 633 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 5 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 2 | ||||
-rw-r--r-- | test/core/surface/completion_queue_threading_test.c | 3 | ||||
m--------- | third_party/boringssl | 0 | ||||
-rw-r--r-- | tools/run_tests/generated/tests.json | 3 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 4 |
16 files changed, 589 insertions, 353 deletions
diff --git a/build.yaml b/build.yaml index 940ca8902d..abf97c58bd 100644 --- a/build.yaml +++ b/build.yaml @@ -4438,6 +4438,7 @@ targets: - grpc - gpr_test_util - gpr + timeout_seconds: 1200 - name: writes_per_rpc_test gtest: true cpu_cost: 0.5 diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 66ba601adb..fb54e9c954 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/string.h" static grpc_wakeup_fd global_wakeup_fd; static int g_epfd; @@ -77,8 +78,21 @@ static void fd_global_shutdown(void); typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; +static const char *kick_state_string(kick_state st) { + switch (st) { + case UNKICKED: + return "UNKICKED"; + case KICKED: + return "KICKED"; + case DESIGNATED_POLLER: + return "DESIGNATED_POLLER"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + struct grpc_pollset_worker { kick_state kick_state; + int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; grpc_pollset_worker *prev; @@ -86,6 +100,12 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; +#define SET_KICK_STATE(worker, state) \ + do { \ + (worker)->kick_state = (state); \ + (worker)->kick_state_mutator = __LINE__; \ + } while (false) + #define MAX_NEIGHBOURHOODS 1024 typedef struct pollset_neighbourhood { @@ -263,17 +283,17 @@ static bool fd_is_shutdown(grpc_fd *fd) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -285,7 +305,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } /******************************************************************************* @@ -410,13 +430,20 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - if (worker->initialized_cv) { - worker->kick_state = KICKED; - gpr_cv_signal(&worker->cv); - } else { - worker->kick_state = KICKED; - append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd), - "pollset_shutdown"); + switch (worker->kick_state) { + case KICKED: + break; + case UNKICKED: + SET_KICK_STATE(worker, KICKED); + if (worker->initialized_cv) { + gpr_cv_signal(&worker->cv); + } + break; + case DESIGNATED_POLLER: + SET_KICK_STATE(worker, KICKED); + append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd), + "pollset_shutdown"); + break; } worker = worker->next; @@ -437,7 +464,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); + GPR_ASSERT(!pollset->shutting_down); pollset->shutdown_closure = closure; + pollset->shutting_down = true; GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -510,10 +539,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) { if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; - worker->kick_state = UNKICKED; + SET_KICK_STATE(worker, UNKICKED); worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; pollset->begin_refs++; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker); + } + if (pollset->seen_inactive) { // pollset has been observed to be inactive, we need to move back to the // active list @@ -529,6 +562,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, retry_lock_neighbourhood: gpr_mu_lock(&neighbourhood->mu); gpr_mu_lock(&pollset->mu); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", + pollset, worker, kick_state_string(worker->kick_state), + is_reassigning); + } if (pollset->seen_inactive) { if (neighbourhood != pollset->neighbourhood) { gpr_mu_unlock(&neighbourhood->mu); @@ -539,8 +577,9 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, pollset->seen_inactive = false; if (neighbourhood->active_root == NULL) { neighbourhood->active_root = pollset->next = pollset->prev = pollset; - if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { - worker->kick_state = DESIGNATED_POLLER; + if (worker->kick_state == UNKICKED && + gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { + SET_KICK_STATE(worker, DESIGNATED_POLLER); } } else { pollset->next = neighbourhood->active_root; @@ -560,18 +599,26 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && - pollset->shutdown_closure == NULL) { + while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", + pollset, worker, kick_state_string(worker->kick_state), + pollset->shutting_down); + } if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && worker->kick_state == UNKICKED) { - worker->kick_state = KICKED; + SET_KICK_STATE(worker, KICKED); } } *now = gpr_now(now->clock_type); } + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d", pollset, + worker, kick_state_string(worker->kick_state), + pollset->shutting_down); + } - return worker->kick_state == DESIGNATED_POLLER && - pollset->shutdown_closure == NULL; + return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighbourhood_for_available_poller( @@ -591,10 +638,18 @@ static bool check_neighbourhood_for_available_poller( case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { - inspect_worker->kick_state = DESIGNATED_POLLER; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. choose next poller to be %p", + inspect_worker); + } + SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER); if (inspect_worker->initialized_cv) { gpr_cv_signal(&inspect_worker->cv); } + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. beaten to choose next poller"); + } } // even if we didn't win the cas, there's a worker, we can stop found_worker = true; @@ -607,9 +662,12 @@ static bool check_neighbourhood_for_available_poller( break; } inspect_worker = inspect_worker->next; - } while (inspect_worker != inspect->root_worker); + } while (!found_worker && inspect_worker != inspect->root_worker); } if (!found_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect); + } inspect->seen_inactive = true; if (inspect == neighbourhood->active_root) { neighbourhood->active_root = @@ -627,15 +685,22 @@ static bool check_neighbourhood_for_available_poller( static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker); + } if (worker_hdl != NULL) *worker_hdl = NULL; - worker->kick_state = KICKED; + /* Make sure we appear kicked */ + SET_KICK_STATE(worker, KICKED); grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); + } GPR_ASSERT(worker->next->initialized_cv); gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next); - worker->next->kick_state = DESIGNATED_POLLER; + SET_KICK_STATE(worker->next, DESIGNATED_POLLER); gpr_cv_signal(&worker->next->cv); if (grpc_exec_ctx_has_work(exec_ctx)) { gpr_mu_unlock(&pollset->mu); @@ -644,9 +709,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { gpr_atm_no_barrier_store(&g_active_poller, 0); - gpr_mu_unlock(&pollset->mu); size_t poller_neighbourhood_idx = (size_t)(pollset->neighbourhood - g_neighbourhoods); + gpr_mu_unlock(&pollset->mu); bool found_worker = false; bool scan_state[MAX_NEIGHBOURHOODS]; for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) { @@ -682,6 +747,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, " .. remove worker"); + } if (EMPTIED == worker_remove(pollset, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -702,16 +770,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutdown_closure); + GPR_ASSERT(!pollset->shutting_down); GPR_ASSERT(!pollset->seen_inactive); gpr_mu_unlock(&pollset->mu); append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline), err_desc); gpr_mu_lock(&pollset->mu); gpr_tls_set(&g_current_thread_worker, 0); + } else { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); gpr_tls_set(&g_current_thread_pollset, 0); @@ -720,46 +790,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_strvec log; + gpr_strvec_init(&log); + char *tmp; + gpr_asprintf( + &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, + specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker); + gpr_strvec_add(&log, tmp); + if (pollset->root_worker != NULL) { + gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", + kick_state_string(pollset->root_worker->kick_state), + pollset->root_worker->next, + kick_state_string(pollset->root_worker->next->kick_state)); + gpr_strvec_add(&log, tmp); + } + if (specific_worker != NULL) { + gpr_asprintf(&tmp, " worker_kick_state=%s", + kick_state_string(specific_worker->kick_state)); + gpr_strvec_add(&log, tmp); + } + tmp = gpr_strvec_flatten(&log, NULL); + gpr_strvec_destroy(&log); + gpr_log(GPR_ERROR, "%s", tmp); + gpr_free(tmp); + } if (specific_worker == NULL) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { grpc_pollset_worker *root_worker = pollset->root_worker; if (root_worker == NULL) { pollset->kicked_without_poller = true; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked_without_poller"); + } return GRPC_ERROR_NONE; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker == next_worker && - root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( - &g_active_poller)) { - root_worker->kick_state = KICKED; + if (root_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); + } + SET_KICK_STATE(root_worker, KICKED); + return GRPC_ERROR_NONE; + } else if (next_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); + } + SET_KICK_STATE(next_worker, KICKED); + return GRPC_ERROR_NONE; + } else if (root_worker == + next_worker && // only try and wake up a poller if + // there is no next worker + root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load( + &g_active_poller)) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked %p", root_worker); + } + SET_KICK_STATE(root_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (next_worker->kick_state == UNKICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked %p", next_worker); + } GPR_ASSERT(next_worker->initialized_cv); - next_worker->kick_state = KICKED; + SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); return GRPC_ERROR_NONE; + } else if (next_worker->kick_state == DESIGNATED_POLLER) { + if (root_worker->kick_state != DESIGNATED_POLLER) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log( + GPR_ERROR, + " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)", + root_worker, root_worker->initialized_cv, next_worker); + } + SET_KICK_STATE(root_worker, KICKED); + if (root_worker->initialized_cv) { + gpr_cv_signal(&root_worker->cv); + } + return GRPC_ERROR_NONE; + } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker, + root_worker); + } + SET_KICK_STATE(next_worker, KICKED); + return grpc_wakeup_fd_wakeup(&global_wakeup_fd); + } } else { + GPR_ASSERT(next_worker->kick_state == KICKED); + SET_KICK_STATE(next_worker, KICKED); return GRPC_ERROR_NONE; } } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kicked while waking up"); + } return GRPC_ERROR_NONE; } } else if (specific_worker->kick_state == KICKED) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. specific worker already kicked"); + } return GRPC_ERROR_NONE; } else if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker); + } + SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } else if (specific_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick active poller"); + } + SET_KICK_STATE(specific_worker, KICKED); return grpc_wakeup_fd_wakeup(&global_wakeup_fd); } else if (specific_worker->initialized_cv) { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick waiting worker"); + } + SET_KICK_STATE(specific_worker, KICKED); gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } else { - specific_worker->kick_state = KICKED; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_ERROR, " .. kick non-waiting worker"); + } + SET_KICK_STATE(specific_worker, KICKED); return GRPC_ERROR_NONE; } } @@ -841,9 +1001,6 @@ static const grpc_event_engine_vtable vtable = { /* It is possible that GLIBC has epoll but the underlying kernel doesn't. * Create a dummy epoll_fd to make sure epoll support is available */ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { - /* TODO(ctiller): temporary, until this stabilizes */ - if (!explicit_request) return NULL; - if (!grpc_has_wakeup_fd()) { return NULL; } @@ -862,6 +1019,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) { return NULL; } + gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd); + return &vtable; } diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 2c91ad357c..23e2189c76 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -1011,12 +1011,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -1227,7 +1227,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -1239,7 +1239,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index 49be72c03e..1058f69a83 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { } static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 5574838187..ceecff7a88 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -412,12 +412,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -676,7 +676,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -688,7 +688,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 255e07010b..29d5bf8e73 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -937,12 +937,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } /******************************************************************************* @@ -1119,7 +1119,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - grpc_lfev_set_ready(exec_ctx, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -1131,7 +1131,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - grpc_lfev_set_ready(exec_ctx, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, @@ -1735,7 +1735,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux( if (!is_grpc_wakeup_signal_initialized) { /* TODO(ctiller): when other epoll engines are ready, remove the true || to * force this to be explitly chosen if needed */ - if (true || explicit_request) { + if (explicit_request) { grpc_use_signal(SIGRTMIN + 6); } else { return NULL; diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c index c2ceecb3c5..f967b22ba9 100644 --- a/src/core/lib/iomgr/lockfree_event.c +++ b/src/core/lib/iomgr/lockfree_event.c @@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) { } void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, - grpc_closure *closure) { + grpc_closure *closure, const char *variable) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state, - (void *)curr, closure); + gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable, + state, (void *)curr, closure); } switch (curr) { case CLOSURE_NOT_READY: { @@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state, + gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state, (void *)curr, grpc_error_string(shutdown_err)); } switch (curr) { @@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, GPR_UNREACHABLE_CODE(return false); } -void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) { +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, + const char *variable) { while (true) { gpr_atm curr = gpr_atm_no_barrier_load(state); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr); + gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state, + (void *)curr); } switch (curr) { diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h index ef3844a07a..6a14a0f3b2 100644 --- a/src/core/lib/iomgr/lockfree_event.h +++ b/src/core/lib/iomgr/lockfree_event.h @@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state); bool grpc_lfev_is_shutdown(gpr_atm *state); void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, - grpc_closure *closure); + grpc_closure *closure, const char *variable); /* Returns true on first successful shutdown */ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, grpc_error *shutdown_err); -void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state); +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state, + const char *variable); #endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b04aee6c73..14e55eda85 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -189,16 +189,19 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; - size_t (*size)(); - void (*begin_op)(grpc_completion_queue *cc, void *tag); - void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *tag, + size_t data_size; + void (*init)(void *data); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq); + void (*destroy)(void *data); + void (*begin_op)(grpc_completion_queue *cq, void *tag); + void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); - grpc_event (*next)(grpc_completion_queue *cc, gpr_timespec deadline, + grpc_event (*next)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); - grpc_event (*pluck)(grpc_completion_queue *cc, void *tag, + grpc_event (*pluck)(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); } cq_vtable; @@ -218,25 +221,28 @@ typedef struct grpc_cq_event_queue { gpr_atm num_queue_items; } grpc_cq_event_queue; -/* TODO: sreek Refactor this based on the completion_type. Put completion-type - * specific data in a different structure (and co-allocate memory for it along - * with completion queue + pollset )*/ -typedef struct cq_data { - gpr_mu *mu; +typedef struct cq_next_data { + /** Completed events for completion-queues of type GRPC_CQ_NEXT */ + grpc_cq_event_queue queue; + /** Counter of how many things have ever been queued on this completion queue + useful for avoiding locks to check the queue */ + gpr_atm things_queued_ever; + + /* Number of outstanding events (+1 if not shut down) */ + gpr_atm pending_events; + + int shutdown_called; +} cq_next_data; + +typedef struct cq_pluck_data { /** Completed events for completion-queues of type GRPC_CQ_PLUCK */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; - /** Completed events for completion-queues of type GRPC_CQ_NEXT */ - grpc_cq_event_queue queue; - /** Number of pending events (+1 if we're not shutdown) */ gpr_refcount pending_events; - /** Once owning_refs drops to zero, we will destroy the cq */ - gpr_refcount owning_refs; - /** Counter of how many things have ever been queued on this completion queue useful for avoiding locks to check the queue */ gpr_atm things_queued_ever; @@ -245,37 +251,45 @@ typedef struct cq_data { gpr_atm shutdown; int shutdown_called; - int is_server_cq; - int num_pluckers; - int num_polls; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; - grpc_closure pollset_shutdown_done; +} cq_pluck_data; + +/* Completion queue structure */ +struct grpc_completion_queue { + /** Once owning_refs drops to zero, we will destroy the cq */ + gpr_refcount owning_refs; + + gpr_mu *mu; + + const cq_vtable *vtable; + const cq_poller_vtable *poller_vtable; #ifndef NDEBUG void **outstanding_tags; size_t outstanding_tag_count; size_t outstanding_tag_capacity; #endif -} cq_data; -/* Completion queue structure */ -struct grpc_completion_queue { - cq_data data; - const cq_vtable *vtable; - const cq_poller_vtable *poller_vtable; + grpc_closure pollset_shutdown_done; + int num_polls; }; /* Forward declarations */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc); - -static size_t cq_size(grpc_completion_queue *cc); - -static void cq_begin_op(grpc_completion_queue *cc, void *tag); +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq); + +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag); +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag); static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -283,39 +297,51 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage); static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage); -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved); +static void cq_init_next(void *data); +static void cq_init_pluck(void *data); +static void cq_destroy_next(void *data); +static void cq_destroy_pluck(void *data); + /* Completion queue vtables based on the completion-type */ static const cq_vtable g_cq_vtable[] = { /* GRPC_CQ_NEXT */ - {.cq_completion_type = GRPC_CQ_NEXT, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_next_data), + .cq_completion_type = GRPC_CQ_NEXT, + .init = cq_init_next, + .shutdown = cq_shutdown_next, + .destroy = cq_destroy_next, + .begin_op = cq_begin_op_for_next, .end_op = cq_end_op_for_next, .next = cq_next, .pluck = NULL}, /* GRPC_CQ_PLUCK */ - {.cq_completion_type = GRPC_CQ_PLUCK, - .size = cq_size, - .begin_op = cq_begin_op, + {.data_size = sizeof(cq_pluck_data), + .cq_completion_type = GRPC_CQ_PLUCK, + .init = cq_init_pluck, + .shutdown = cq_shutdown_pluck, + .destroy = cq_destroy_pluck, + .begin_op = cq_begin_op_for_pluck, .end_op = cq_end_op_for_pluck, .next = NULL, .pluck = cq_pluck}, }; -#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) -#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) +#define DATA_FROM_CQ(cq) ((void *)(cq + 1)) +#define POLLSET_FROM_CQ(cq) \ + ((grpc_pollset *)(cq->vtable->data_size + (char *)DATA_FROM_CQ(cq))) grpc_tracer_flag grpc_cq_pluck_trace = GRPC_TRACER_INITIALIZER(true); grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); @@ -329,7 +355,7 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = GRPC_TRACER_INITIALIZER(true); gpr_free(_ev); \ } -static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, +static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); static void cq_event_queue_init(grpc_cq_event_queue *q) { @@ -342,9 +368,9 @@ static void cq_event_queue_destroy(grpc_cq_event_queue *q) { gpr_mpscq_destroy(&q->queue); } -static void cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { +static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { gpr_mpscq_push(&q->queue, (gpr_mpscq_node *)c); - gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1); + return gpr_atm_no_barrier_fetch_add(&q->num_queue_items, 1) == 0; } static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { @@ -367,16 +393,10 @@ static long cq_event_queue_num_items(grpc_cq_event_queue *q) { return (long)gpr_atm_no_barrier_load(&q->num_queue_items); } -static size_t cq_size(grpc_completion_queue *cc) { - /* Size of the completion queue and the size of the pollset whose memory is - allocated right after that of completion queue */ - return sizeof(grpc_completion_queue) + cc->poller_vtable->size(); -} - grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type) { - grpc_completion_queue *cc; + grpc_completion_queue *cq; GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); @@ -389,158 +409,173 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; - cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); - cq_data *cqd = &cc->data; + cq = gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + + poller_vtable->size()); - cc->vtable = vtable; - cc->poller_vtable = poller_vtable; + cq->vtable = vtable; + cq->poller_vtable = poller_vtable; - poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->data.mu); + /* One for destroy(), one for pollset_shutdown */ + gpr_ref_init(&cq->owning_refs, 2); -#ifndef NDEBUG - cqd->outstanding_tags = NULL; - cqd->outstanding_tag_capacity = 0; -#endif + poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu); + vtable->init(DATA_FROM_CQ(cq)); + + GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq, + grpc_schedule_on_exec_ctx); + + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); + + return cq; +} +static void cq_init_next(void *ptr) { + cq_next_data *cqd = ptr; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ + gpr_atm_no_barrier_store(&cqd->pending_events, 1); + cqd->shutdown_called = false; + gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); + cq_event_queue_init(&cqd->queue); +} + +static void cq_destroy_next(void *ptr) { + cq_next_data *cqd = ptr; + GPR_ASSERT(cq_event_queue_num_items(&cqd->queue) == 0); + cq_event_queue_destroy(&cqd->queue); +} + +static void cq_init_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cqd->pending_events, 1); - /* One for destroy(), one for pollset_shutdown */ - gpr_ref_init(&cqd->owning_refs, 2); cqd->completed_tail = &cqd->completed_head; cqd->completed_head.next = (uintptr_t)cqd->completed_tail; gpr_atm_no_barrier_store(&cqd->shutdown, 0); cqd->shutdown_called = 0; - cqd->is_server_cq = 0; cqd->num_pluckers = 0; - cqd->num_polls = 0; gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0); -#ifndef NDEBUG - cqd->outstanding_tag_count = 0; -#endif - cq_event_queue_init(&cqd->queue); - GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, - grpc_schedule_on_exec_ctx); - - GPR_TIMER_END("grpc_completion_queue_create_internal", 0); +} - return cc; +static void cq_destroy_pluck(void *ptr) { + cq_pluck_data *cqd = ptr; + GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); } -grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { - return cc->vtable->cq_completion_type; +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cq) { + return cq->vtable->cq_completion_type; } -int grpc_get_cq_poll_num(grpc_completion_queue *cc) { +int grpc_get_cq_poll_num(grpc_completion_queue *cq) { int cur_num_polls; - gpr_mu_lock(cc->data.mu); - cur_num_polls = cc->data.num_polls; - gpr_mu_unlock(cc->data.mu); + gpr_mu_lock(cq->mu); + cur_num_polls = cq->num_polls; + gpr_mu_unlock(cq->mu); return cur_num_polls; } #ifndef NDEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, +void grpc_cq_internal_ref(grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val + 1, + "CQ:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1, reason); } #else -void grpc_cq_internal_ref(grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +void grpc_cq_internal_ref(grpc_completion_queue *cq) { #endif - gpr_ref(&cqd->owning_refs); + gpr_ref(&cq->owning_refs); } static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_completion_queue *cc = arg; - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cc, "pollset_destroy"); + grpc_completion_queue *cq = arg; + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "pollset_destroy"); } #ifndef NDEBUG -void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, const char *reason, const char *file, int line) { - cq_data *cqd = &cc->data; if (GRPC_TRACER_ON(grpc_trace_cq_refcount)) { - gpr_atm val = gpr_atm_no_barrier_load(&cqd->owning_refs.count); + gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cc, val, val - 1, + "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1, reason); } #else void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; + grpc_completion_queue *cq) { #endif - if (gpr_unref(&cqd->owning_refs)) { - GPR_ASSERT(cqd->completed_head.next == (uintptr_t)&cqd->completed_head); - cc->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cc)); - cq_event_queue_destroy(&cqd->queue); + if (gpr_unref(&cq->owning_refs)) { + cq->vtable->destroy(DATA_FROM_CQ(cq)); + cq->poller_vtable->destroy(exec_ctx, POLLSET_FROM_CQ(cq)); #ifndef NDEBUG - gpr_free(cqd->outstanding_tags); + gpr_free(cq->outstanding_tags); #endif - gpr_free(cc); + gpr_free(cq); } } -static void cq_begin_op(grpc_completion_queue *cc, void *tag) { - cq_data *cqd = &cc->data; -#ifndef NDEBUG - gpr_mu_lock(cqd->mu); +static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + GPR_ASSERT(!cqd->shutdown_called); + gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1); +} + +static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(!cqd->shutdown_called); - if (cqd->outstanding_tag_count == cqd->outstanding_tag_capacity) { - cqd->outstanding_tag_capacity = - GPR_MAX(4, 2 * cqd->outstanding_tag_capacity); - cqd->outstanding_tags = - gpr_realloc(cqd->outstanding_tags, sizeof(*cqd->outstanding_tags) * - cqd->outstanding_tag_capacity); - } - cqd->outstanding_tags[cqd->outstanding_tag_count++] = tag; - gpr_mu_unlock(cqd->mu); -#endif gpr_ref(&cqd->pending_events); } -void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { - cc->vtable->begin_op(cc, tag); +void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) { +#ifndef NDEBUG + gpr_mu_lock(cq->mu); + if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) { + cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity); + cq->outstanding_tags = + gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) * + cq->outstanding_tag_capacity); + } + cq->outstanding_tags[cq->outstanding_tag_count++] = tag; + gpr_mu_unlock(cq->mu); +#endif + cq->vtable->begin_op(cq, tag); } #ifndef NDEBUG -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) { - cq_data *cqd = &cc->data; +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) { int found = 0; if (lock_cq) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); } - for (int i = 0; i < (int)cqd->outstanding_tag_count; i++) { - if (cqd->outstanding_tags[i] == tag) { - cqd->outstanding_tag_count--; - GPR_SWAP(void *, cqd->outstanding_tags[i], - cqd->outstanding_tags[cqd->outstanding_tag_count]); + for (int i = 0; i < (int)cq->outstanding_tag_count; i++) { + if (cq->outstanding_tags[i] == tag) { + cq->outstanding_tag_count--; + GPR_SWAP(void *, cq->outstanding_tags[i], + cq->outstanding_tags[cq->outstanding_tag_count]); found = 1; break; } } if (lock_cq) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } GPR_ASSERT(found); } #else -static void cq_check_tag(grpc_completion_queue *cc, void *tag, bool lock_cq) {} +static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} #endif -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_NEXT) */ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, @@ -553,16 +588,16 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_next(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_next(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -570,28 +605,42 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = (uintptr_t)(is_success); - cq_check_tag(cc, tag, true); /* Used in debug builds only */ + cq_check_tag(cq, tag, true); /* Used in debug builds only */ /* Add the completion to the queue */ - cq_event_queue_push(&cqd->queue, storage); + bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - gpr_mu_lock(cqd->mu); - - int shutdown = gpr_unref(&cqd->pending_events); - if (!shutdown) { - grpc_error *kick_error = cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), NULL); - gpr_mu_unlock(cqd->mu); - - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - - GRPC_ERROR_UNREF(kick_error); + bool will_definitely_shutdown = + gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } + } + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); @@ -599,16 +648,17 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a completion +/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a + * completion * type of GRPC_CQ_PLUCK) */ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc, void *tag, + grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -618,9 +668,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, error != GRPC_ERROR_NONE)) { const char *errmsg = grpc_error_string(error); GRPC_API_TRACE( - "cq_end_op_for_pluck(exec_ctx=%p, cc=%p, tag=%p, error=%s, " + "cq_end_op_for_pluck(exec_ctx=%p, cq=%p, tag=%p, error=%s, " "done=%p, done_arg=%p, storage=%p)", - 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); + 7, (exec_ctx, cq, tag, errmsg, done, done_arg, storage)); if (GRPC_TRACER_ON(grpc_trace_operation_failures) && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); @@ -632,8 +682,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, storage->done_arg = done_arg; storage->next = ((uintptr_t)&cqd->completed_head) | ((uintptr_t)(is_success)); - gpr_mu_lock(cqd->mu); - cq_check_tag(cc, tag, false); /* Used in debug builds only */ + gpr_mu_lock(cq->mu); + cq_check_tag(cq, tag, false); /* Used in debug builds only */ /* Add to the list of completions */ gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); @@ -652,9 +702,9 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, } grpc_error *kick_error = - cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -663,8 +713,8 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(kick_error); } } else { - cq_finish_shutdown(exec_ctx, cc); - gpr_mu_unlock(cqd->mu); + cq_finish_shutdown_pluck(exec_ctx, cq); + gpr_mu_unlock(cq->mu); } GPR_TIMER_END("cq_end_op_for_pluck", 0); @@ -672,12 +722,12 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, +void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag, grpc_error *error, void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cc->vtable->end_op(exec_ctx, cc, tag, error, done, done_arg, storage); + cq->vtable->end_op(exec_ctx, cq, tag, error, done, done_arg, storage); } typedef struct { @@ -692,7 +742,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -703,7 +753,8 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { gpr_atm_no_barrier_load(&cqd->things_queued_ever); /* Pop a cq_completion from the queue. Returns NULL if the queue is empty - * might return NULL in some cases even if the queue is not empty; but that + * might return NULL in some cases even if the queue is not empty; but + * that * is ok and doesn't affect correctness. Might effect the tail latencies a * bit) */ a->stolen_completion = cq_event_queue_pop(&cqd->queue); @@ -716,58 +767,56 @@ static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { } #ifndef NDEBUG -static void dump_pending_tags(grpc_completion_queue *cc) { +static void dump_pending_tags(grpc_completion_queue *cq) { if (!GRPC_TRACER_ON(grpc_trace_pending_tags)) return; - cq_data *cqd = &cc->data; - gpr_strvec v; gpr_strvec_init(&v); gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:")); - gpr_mu_lock(cqd->mu); - for (size_t i = 0; i < cqd->outstanding_tag_count; i++) { + gpr_mu_lock(cq->mu); + for (size_t i = 0; i < cq->outstanding_tag_count; i++) { char *s; - gpr_asprintf(&s, " %p", cqd->outstanding_tags[i]); + gpr_asprintf(&s, " %p", cq->outstanding_tags[i]); gpr_strvec_add(&v, s); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); char *out = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); gpr_log(GPR_DEBUG, "%s", out); gpr_free(out); } #else -static void dump_pending_tags(grpc_completion_queue *cc) {} +static void dump_pending_tags(grpc_completion_queue *cq) {} #endif -static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, +static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_next_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( "grpc_completion_queue_next(" - "cc=%p, " + "cq=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + 5, (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "next"); + GRPC_CQ_INTERNAL_REF(cq, "next"); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = NULL, @@ -800,21 +849,24 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, /* If c == NULL it means either the queue is empty OR in an transient inconsistent state. If it is the latter, we shold do a 0-timeout poll so that the thread comes back quickly from poll to make a second - attempt at popping. Not doing this can potentially deadlock this thread + attempt at popping. Not doing this can potentially deadlock this + thread forever (if the deadline is infinity) */ if (cq_event_queue_num_items(&cqd->queue) > 0) { iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC); } } - if (gpr_atm_no_barrier_load(&cqd->shutdown)) { + if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ if (cq_event_queue_num_items(&cqd->queue) > 0) { /* Go to the beginning of the loop. No point doing a poll because - (cc->shutdown == true) is only possible when there is no pending work - (i.e cc->pending_events == 0) and any outstanding grpc_cq_completion + (cq->shutdown == true) is only possible when there is no pending + work + (i.e cq->pending_events == 0) and any outstanding + grpc_cq_completion events are already queued on this cq */ continue; } @@ -828,16 +880,16 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } /* The main polling work happens in grpc_pollset_work */ - gpr_mu_lock(cqd->mu); - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + gpr_mu_lock(cq->mu); + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), NULL, now, iteration_deadline); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); if (err != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(err); @@ -846,30 +898,74 @@ static grpc_event cq_next(grpc_completion_queue *cc, gpr_timespec deadline, GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; } - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "next"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); + if (cq_event_queue_num_items(&cqd->queue) > 0 && + gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_mu_lock(cq->mu); + cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); + } + GPR_TIMER_END("grpc_completion_queue_next", 0); return ret; } -grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, +/* Finishes the completion queue shutdown. This means that there are no more + completion events / tags expected from the completion queue + - Must be called under completion queue lock + - Must be called only once in completion queue's lifetime + - grpc_completion_queue_shutdown() MUST have been called before calling + this function */ +static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GPR_ASSERT(cqd->shutdown_called); + GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); + + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); +} + +static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_next_data *cqd = DATA_FROM_CQ(cq); + + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + if (cqd->shutdown_called) { + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + GPR_TIMER_END("grpc_completion_queue_shutdown", 0); + return; + } + cqd->shutdown_called = 1; + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + cq_finish_shutdown_next(exec_ctx, cq); + } + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); +} + +grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { - return cc->vtable->next(cc, deadline, reserved); + return cq->vtable->next(cq, deadline, reserved); } -static int add_plucker(grpc_completion_queue *cc, void *tag, +static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -879,9 +975,9 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, return 1; } -static void del_plucker(grpc_completion_queue *cc, void *tag, +static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -895,13 +991,13 @@ static void del_plucker(grpc_completion_queue *cc, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = arg; grpc_completion_queue *cq = a->cq; - cq_data *cqd = &cq->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); if (current_last_seen_things_queued_ever != a->last_seen_things_queued_ever) { - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); a->last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever); grpc_cq_completion *c; @@ -913,51 +1009,51 @@ static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); a->stolen_completion = c; return true; } prev = c; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); } return !a->first_loop && gpr_time_cmp(a->deadline, gpr_now(a->deadline.clock_type)) < 0; } -static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, +static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; gpr_timespec now; - cq_data *cqd = &cc->data; + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); if (GRPC_TRACER_ON(grpc_cq_pluck_trace)) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" - "cc=%p, tag=%p, " + "cq=%p, tag=%p, " "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, + 6, (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, reserved)); } GPR_ASSERT(!reserved); - dump_pending_tags(cc); + dump_pending_tags(cq); deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(cqd->mu); + GRPC_CQ_INTERNAL_REF(cq, "pluck"); + gpr_mu_lock(cq->mu); cq_is_finished_arg is_finished_arg = { .last_seen_things_queued_ever = gpr_atm_no_barrier_load(&cqd->things_queued_ever), - .cq = cc, + .cq = cq, .deadline = deadline, .stolen_completion = NULL, .tag = tag, @@ -966,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, GRPC_EXEC_CTX_INITIALIZER(0, cq_is_pluck_finished, &is_finished_arg); for (;;) { if (is_finished_arg.stolen_completion != NULL) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); c = is_finished_arg.stolen_completion; is_finished_arg.stolen_completion = NULL; ret.type = GRPC_OP_COMPLETE; @@ -983,7 +1079,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, if (c == cqd->completed_tail) { cqd->completed_tail = prev; } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -993,54 +1089,54 @@ static grpc_event cq_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (gpr_atm_no_barrier_load(&cqd->shutdown)) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } - if (!add_plucker(cc, tag, &worker)) { + if (!add_plucker(cq, tag, &worker)) { gpr_log(GPR_DEBUG, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!is_finished_arg.first_loop && gpr_time_cmp(now, deadline) >= 0) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } - cqd->num_polls++; - grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + cq->num_polls++; + grpc_error *err = cq->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cq), &worker, now, deadline); if (err != GRPC_ERROR_NONE) { - del_plucker(cc, tag, &worker); - gpr_mu_unlock(cqd->mu); + del_plucker(cq, tag, &worker); + gpr_mu_unlock(cq->mu); const char *msg = grpc_error_string(err); gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg); GRPC_ERROR_UNREF(err); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - dump_pending_tags(cc); + dump_pending_tags(cq); break; } is_finished_arg.first_loop = false; - del_plucker(cc, tag, &worker); + del_plucker(cq, tag, &worker); } done: - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "pluck"); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "pluck"); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(is_finished_arg.stolen_completion == NULL); @@ -1049,85 +1145,66 @@ done: return ret; } -grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) { - return cc->vtable->pluck(cc, tag, deadline, reserved); + return cq->vtable->pluck(cq, tag, deadline, reserved); } -/* Finishes the completion queue shutdown. This means that there are no more - completion events / tags expected from the completion queue - - Must be called under completion queue lock - - Must be called only once in completion queue's lifetime - - grpc_completion_queue_shutdown() MUST have been called before calling - this function */ -static void cq_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_completion_queue *cc) { - cq_data *cqd = &cc->data; +static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); gpr_atm_no_barrier_store(&cqd->shutdown, 1); - cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cqd->pollset_shutdown_done); + cq->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cq), + &cq->pollset_shutdown_done); } -/* Shutdown simply drops a ref that we reserved at creation time; if we drop - to zero here, then enter shutdown mode and wake up any waiters */ -void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); - GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - cq_data *cqd = &cc->data; +static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, + grpc_completion_queue *cq) { + cq_pluck_data *cqd = DATA_FROM_CQ(cq); - gpr_mu_lock(cqd->mu); + gpr_mu_lock(cq->mu); if (cqd->shutdown_called) { - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } cqd->shutdown_called = 1; if (gpr_unref(&cqd->pending_events)) { - cq_finish_shutdown(&exec_ctx, cc); + cq_finish_shutdown_pluck(exec_ctx, cq); } - gpr_mu_unlock(cqd->mu); + gpr_mu_unlock(cq->mu); +} + +/* Shutdown simply drops a ref that we reserved at creation time; if we drop + to zero here, then enter shutdown mode and wake up any waiters */ +void grpc_completion_queue_shutdown(grpc_completion_queue *cq) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); + GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq)); + cq->vtable->shutdown(&exec_ctx, cq); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } -void grpc_completion_queue_destroy(grpc_completion_queue *cc) { - GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc)); +void grpc_completion_queue_destroy(grpc_completion_queue *cq) { + GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq)); GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0); - grpc_completion_queue_shutdown(cc); - - /* TODO (sreek): This should not ideally be here. Refactor it into the - * cq_vtable (perhaps have a create/destroy methods in the cq vtable) */ - if (cc->vtable->cq_completion_type == GRPC_CQ_NEXT) { - GPR_ASSERT(cq_event_queue_num_items(&cc->data.queue) == 0); - } + grpc_completion_queue_shutdown(cq); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cc, "destroy"); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "destroy"); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_destroy", 0); } -grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; -} - -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { - return CQ_FROM_POLLSET(ps); -} - -void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { - cc->data.is_server_cq = 1; -} - -bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { - return cc->data.is_server_cq; +grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cq) { + return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : NULL; } -bool grpc_cq_can_listen(grpc_completion_queue *cc) { - return cc->poller_vtable->can_listen; +bool grpc_cq_can_listen(grpc_completion_queue *cq) { + return cq->poller_vtable->can_listen; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 97ea9cae20..af44482513 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -84,10 +84,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); -grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_server_cq(grpc_completion_queue *cc); bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..19177a46c9 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -951,8 +951,6 @@ static void register_completion_queue(grpc_server *server, if (server->cqs[i] == cq) return; } - grpc_cq_mark_server_cq(cq); - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1116,9 +1114,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->channel = channel; size_t cq_idx; - grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { - if (s->cqs[cq_idx] == accepting_cq) break; + if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break; } if (cq_idx == s->cq_count) { /* completion queue not found: pick a random one to publish new calls to */ diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index c27337aaa8..f9d88d6327 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -93,7 +93,7 @@ static void test_pollset_conversion(void) { attr.cq_polling_type = polling_types[j]; cq = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq); + GPR_ASSERT(grpc_cq_pollset(cq) != NULL); shutdown_and_destroy(cq); } } diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c index 366565fb35..99d0fa4980 100644 --- a/test/core/surface/completion_queue_threading_test.c +++ b/test/core/surface/completion_queue_threading_test.c @@ -190,7 +190,8 @@ static void consumer_thread(void *arg) { gpr_log(GPR_INFO, "consumer %d phase 2", opt->id); for (;;) { - ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL); + ev = grpc_completion_queue_next(opt->cc, + gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); switch (ev.type) { case GRPC_OP_COMPLETE: GPR_ASSERT(ev.success); diff --git a/third_party/boringssl b/third_party/boringssl -Subproject be2ee342d3781ddb954f91f8a7e660c6f59e87e +Subproject 78684e5b222645828ca302e56b40b9daff2b2d2 diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index b66a84285a..d8eba49f05 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3925,7 +3925,8 @@ "mac", "posix", "windows" - ] + ], + "timeout_seconds": 1200 }, { "args": [], diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 611868ce5a..ddfc1d68f1 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -63,8 +63,8 @@ _FORCE_ENVIRON_FOR_WRAPPERS = { } _POLLING_STRATEGIES = { - 'linux': ['epollsig', 'poll', 'poll-cv'], -# TODO(ctiller, sreecha): enable epoll1, epollex, epoll-thread-pool + 'linux': ['epoll1', 'epollsig', 'poll', 'poll-cv'], +# TODO(ctiller, sreecha): enable epollex, epoll-thread-pool 'mac': ['poll'], } |