diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/OWNERS | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 56 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_posix.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr_uv.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/pollset_windows.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_generic.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_uv.c | 5 |
18 files changed, 86 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/OWNERS b/src/core/lib/iomgr/OWNERS new file mode 100644 index 0000000000..8ee86c2618 --- /dev/null +++ b/src/core/lib/iomgr/OWNERS @@ -0,0 +1 @@ +@murgatroid99 *_uv.c *_uv.h diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index e028e72ed6..26f9cbe0fa 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -25,7 +25,7 @@ #include "src/core/lib/profiling/timers.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure"); #endif #ifndef NDEBUG diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 7f9c5d837f..c72c37e2b5 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -27,7 +27,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" -grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_combiner_trace = + GRPC_TRACER_INITIALIZER(false, "combiner"); #define GRPC_COMBINER_TRACE(fn) \ do { \ diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index a95929a1fb..3759dda992 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -36,7 +36,8 @@ #include "src/core/lib/slice/slice_internal.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_error_refcount = + GRPC_TRACER_INITIALIZER(false, "error_refcount"); #endif static const char *error_int_name(grpc_error_ints key) { diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index ceecff7a88..2499417356 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -103,6 +103,32 @@ typedef struct pollable { grpc_pollset_worker *root_worker; } pollable; +static const char *polling_obj_type_string(polling_obj_type t) { + switch (t) { + case PO_POLLING_GROUP: + return "polling_group"; + case PO_POLLSET_SET: + return "pollset_set"; + case PO_POLLSET: + return "pollset"; + case PO_FD: + return "fd"; + case PO_EMPTY_POLLABLE: + return "empty_pollable"; + case PO_COUNT: + return "<invalid:count>"; + } + return "<invalid>"; +} + +static char *pollable_desc(pollable *p) { + char *out; + gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", + polling_obj_type_string(p->po.type), p->po.group, p->epfd, + p->wakeup.read_fd); + return out; +} + static pollable g_empty_pollable; static void pollable_init(pollable *p, polling_obj_type type); @@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { GPR_ASSERT(epfd != -1); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p); + gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } gpr_mu_lock(&fd->orphaned_mu); @@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, if (worker->pollable != &pollset->pollable) { gpr_mu_lock(&worker->pollable->po.mu); } - if (worker->initialized_cv) { + if (worker->initialized_cv && worker != pollset->root_worker) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", + pollset, worker, &pollset->pollable, worker->pollable); + } append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), "pollset_shutdown"); } @@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout); + char *desc = pollable_desc(p); + gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_free(desc); } if (timeout != 0) { @@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; if (pollset->current_pollable == &g_empty_pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); + } /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; @@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); REF_BY(fd, 2, "pollset_pollable"); } else if (pollset->current_pollable == &pollset->pollable) { - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); + } append_error(&error, pollable_add_fd(pollset->current_pollable, fd), err_desc); } else if (pollset->current_pollable != &fd->pollable) { grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; - if (GRPC_TRACER_ON(grpc_polling_trace)) + if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); + } + /* Introduce a spurious completion. + If we do not, then it may be that the fd-specific epoll set consumed + a completion without being polled, leading to a missed edge going up. */ + grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure); pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2648df393d..a5ae04cb5b 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -39,10 +39,11 @@ #include "src/core/lib/support/env.h" grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false); /* Disabled by default */ + GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif /** Default poll() function - a pointer so that it can be overridden by some @@ -124,7 +125,7 @@ void grpc_set_event_engine_test_only( const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; } void grpc_event_engine_init(void) { - grpc_register_tracer("polling", &grpc_polling_trace); + grpc_register_tracer(&grpc_polling_trace); char *s = gpr_getenv("GRPC_POLL_STRATEGY"); if (s == NULL) { diff --git a/src/core/lib/iomgr/ev_windows.c b/src/core/lib/iomgr/ev_windows.c index 027609c7e8..c24dfaeaf7 100644 --- a/src/core/lib/iomgr/ev_windows.c +++ b/src/core/lib/iomgr/ev_windows.c @@ -23,6 +23,6 @@ #include "src/core/lib/debug/trace.h" grpc_tracer_flag grpc_polling_trace = - GRPC_TRACER_INITIALIZER(false); /* Disabled by default */ + GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */ #endif // GRPC_WINSOCK_SOCKET diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c index 43f5d0406e..f5875a247e 100644 --- a/src/core/lib/iomgr/iomgr_posix.c +++ b/src/core/lib/iomgr/iomgr_posix.c @@ -28,7 +28,7 @@ void grpc_iomgr_platform_init(void) { grpc_wakeup_fd_global_init(); grpc_event_engine_init(); - grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_register_tracer(&grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 49d1a03c32..8b1245c640 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -26,7 +26,7 @@ void grpc_iomgr_platform_init(void) { grpc_pollset_global_init(); - grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_register_tracer(&grpc_tcp_trace); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 1a54065a91..946f0c8c60 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -34,7 +34,8 @@ #include "src/core/lib/debug/trace.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif struct grpc_pollset { diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 1bfc2a22a8..ea017a6054 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -31,7 +31,8 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) #ifndef NDEBUG -grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_fd_refcount = + GRPC_TRACER_INITIALIZER(false, "fd_refcount"); #endif gpr_mu grpc_polling_mu; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index f2cc1be74e..a31d9eef93 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -29,7 +29,8 @@ #include "src/core/lib/iomgr/combiner.h" -grpc_tracer_flag grpc_resource_quota_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_resource_quota_trace = + GRPC_TRACER_INITIALIZER(false, "resource_quota"); #define MEMORY_USAGE_ESTIMATION_MAX 65536 diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 5de2b0f4ee..b6dcd15cb0 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -59,7 +59,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; typedef size_t msg_iovlen_type; #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); typedef struct { grpc_endpoint base; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index ff5fd3edc8..7c6a9b85f5 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -37,7 +37,7 @@ #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); typedef struct { grpc_endpoint base; diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 6704a158ce..2cbb97403b 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -48,7 +48,7 @@ #define GRPC_FIONBIO FIONBIO #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); static grpc_error *set_non_block(SOCKET sock) { int status; diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index e6a9eb0e86..12efce241f 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -41,8 +41,9 @@ #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); +grpc_tracer_flag grpc_timer_check_trace = + GRPC_TRACER_INITIALIZER(false, "timer_check"); /* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with * deadlines earlier than 'queue_deadline" cap are maintained in the heap and @@ -160,8 +161,8 @@ void grpc_timer_list_init(gpr_timespec now) { g_shared_mutables.min_timer = timespec_to_atm_round_down(now); gpr_tls_init(&g_last_seen_min_timer); gpr_tls_set(&g_last_seen_min_timer, 0); - grpc_register_tracer("timer", &grpc_timer_trace); - grpc_register_tracer("timer_check", &grpc_timer_check_trace); + grpc_register_tracer(&grpc_timer_trace); + grpc_register_tracer(&grpc_timer_check_trace); for (i = 0; i < NUM_SHARDS; i++) { timer_shard *shard = &g_shards[i]; diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index cb7998db97..b9bea9a2ab 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -56,7 +56,7 @@ static gpr_timespec g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; -static void timer_thread(void *unused); +static void timer_thread(void *completed_thread_ptr); static void gc_completed_threads(void) { if (g_completed_threads != NULL) { @@ -81,10 +81,10 @@ static void start_timer_thread_and_unlock(void) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "Spawn timer thread"); } - gpr_thd_id thd; gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&thd, timer_thread, NULL, &opt); + completed_thread *ct = gpr_malloc(sizeof(*ct)); + gpr_thd_new(&ct->t, timer_thread, ct, &opt); } void grpc_timer_manager_tick() { @@ -245,7 +245,7 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) { } } -static void timer_thread_cleanup(void) { +static void timer_thread_cleanup(completed_thread *ct) { gpr_mu_lock(&g_mu); // terminate the thread: drop the waiter count, thread count, and let whomever // stopped the threading stuff know that we're done @@ -254,8 +254,6 @@ static void timer_thread_cleanup(void) { if (0 == g_thread_count) { gpr_cv_signal(&g_cv_shutdown); } - completed_thread *ct = gpr_malloc(sizeof(*ct)); - ct->t = gpr_thd_currentid(); ct->next = g_completed_threads; g_completed_threads = ct; gpr_mu_unlock(&g_mu); @@ -264,14 +262,14 @@ static void timer_thread_cleanup(void) { } } -static void timer_thread(void *unused) { +static void timer_thread(void *completed_thread_ptr) { // this threads exec_ctx: we try to run things through to completion here // since it's easy to spin up new threads grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); timer_main_loop(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); - timer_thread_cleanup(); + timer_thread_cleanup(completed_thread_ptr); } static void start_threads(void) { diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 4f204cfbf8..1ab82ef1d5 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -28,8 +28,9 @@ #include <uv.h> -grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer"); +grpc_tracer_flag grpc_timer_check_trace = + GRPC_TRACER_INITIALIZER(false, "timer_check"); static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); } |