aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-07-13 18:36:05 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-07-13 18:36:05 -0700
commitce47215063da4103516a81b72721e4f698955b1b (patch)
treede343067923cc44288fb16ea54890fc9e7b8c0c5 /src/core/lib/iomgr
parent48d90c22723c74db9ea27f0415a2c75b2bc14070 (diff)
parentbd4439c4a63e5d3aa0bacc5d724ede28de7903e7 (diff)
Merge branch 'master' into sreek-epoll1
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/OWNERS1
-rw-r--r--src/core/lib/iomgr/closure.c2
-rw-r--r--src/core/lib/iomgr/combiner.c3
-rw-r--r--src/core/lib/iomgr/error.c3
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c56
-rw-r--r--src/core/lib/iomgr/ev_posix.c7
-rw-r--r--src/core/lib/iomgr/ev_windows.c2
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c2
-rw-r--r--src/core/lib/iomgr/iomgr_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_uv.c3
-rw-r--r--src/core/lib/iomgr/pollset_windows.c3
-rw-r--r--src/core/lib/iomgr/resource_quota.c3
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_uv.c2
-rw-r--r--src/core/lib/iomgr/tcp_windows.c2
-rw-r--r--src/core/lib/iomgr/timer_generic.c9
-rw-r--r--src/core/lib/iomgr/timer_manager.c14
-rw-r--r--src/core/lib/iomgr/timer_uv.c5
18 files changed, 86 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/OWNERS b/src/core/lib/iomgr/OWNERS
new file mode 100644
index 0000000000..8ee86c2618
--- /dev/null
+++ b/src/core/lib/iomgr/OWNERS
@@ -0,0 +1 @@
+@murgatroid99 *_uv.c *_uv.h
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index e028e72ed6..26f9cbe0fa 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -25,7 +25,7 @@
#include "src/core/lib/profiling/timers.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_closure = GRPC_TRACER_INITIALIZER(false, "closure");
#endif
#ifndef NDEBUG
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 7f9c5d837f..c72c37e2b5 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -27,7 +27,8 @@
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/profiling/timers.h"
-grpc_tracer_flag grpc_combiner_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_combiner_trace =
+ GRPC_TRACER_INITIALIZER(false, "combiner");
#define GRPC_COMBINER_TRACE(fn) \
do { \
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index a95929a1fb..3759dda992 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -36,7 +36,8 @@
#include "src/core/lib/slice/slice_internal.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_error_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_error_refcount =
+ GRPC_TRACER_INITIALIZER(false, "error_refcount");
#endif
static const char *error_int_name(grpc_error_ints key) {
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index ceecff7a88..2499417356 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -103,6 +103,32 @@ typedef struct pollable {
grpc_pollset_worker *root_worker;
} pollable;
+static const char *polling_obj_type_string(polling_obj_type t) {
+ switch (t) {
+ case PO_POLLING_GROUP:
+ return "polling_group";
+ case PO_POLLSET_SET:
+ return "pollset_set";
+ case PO_POLLSET:
+ return "pollset";
+ case PO_FD:
+ return "fd";
+ case PO_EMPTY_POLLABLE:
+ return "empty_pollable";
+ case PO_COUNT:
+ return "<invalid:count>";
+ }
+ return "<invalid>";
+}
+
+static char *pollable_desc(pollable *p) {
+ char *out;
+ gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
+ polling_obj_type_string(p->po.type), p->po.group, p->epfd,
+ p->wakeup.read_fd);
+ return out;
+}
+
static pollable g_empty_pollable;
static void pollable_init(pollable *p, polling_obj_type type);
@@ -472,7 +498,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
GPR_ASSERT(epfd != -1);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "add fd %p to pollable %p", fd, p);
+ gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
gpr_mu_lock(&fd->orphaned_mu);
@@ -537,10 +563,18 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
if (worker->pollable != &pollset->pollable) {
gpr_mu_lock(&worker->pollable->po.mu);
}
- if (worker->initialized_cv) {
+ if (worker->initialized_cv && worker != pollset->root_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
worker->kicked = true;
gpr_cv_signal(&worker->cv);
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
+ pollset, worker, &pollset->pollable, worker->pollable);
+ }
append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup),
"pollset_shutdown");
}
@@ -770,7 +804,9 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p for %dms", pollset, p, timeout);
+ char *desc = pollable_desc(p);
+ gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_free(desc);
}
if (timeout != 0) {
@@ -985,10 +1021,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
static const char *err_desc = "pollset_add_fd";
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->current_pollable == &g_empty_pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
+ }
/* empty pollable --> single fd pollable */
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
@@ -997,16 +1034,23 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu);
REF_BY(fd, 2, "pollset_pollable");
} else if (pollset->current_pollable == &pollset->pollable) {
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
+ }
append_error(&error, pollable_add_fd(pollset->current_pollable, fd),
err_desc);
} else if (pollset->current_pollable != &fd->pollable) {
grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable;
- if (GRPC_TRACER_ON(grpc_polling_trace))
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
pollset, fd, had_fd);
+ }
+ /* Introduce a spurious completion.
+ If we do not, then it may be that the fd-specific epoll set consumed
+ a completion without being polled, leading to a missed edge going up. */
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 2648df393d..a5ae04cb5b 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -39,10 +39,11 @@
#include "src/core/lib/support/env.h"
grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false); /* Disabled by default */
+ GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
/** Default poll() function - a pointer so that it can be overridden by some
@@ -124,7 +125,7 @@ void grpc_set_event_engine_test_only(
const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
void grpc_event_engine_init(void) {
- grpc_register_tracer("polling", &grpc_polling_trace);
+ grpc_register_tracer(&grpc_polling_trace);
char *s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == NULL) {
diff --git a/src/core/lib/iomgr/ev_windows.c b/src/core/lib/iomgr/ev_windows.c
index 027609c7e8..c24dfaeaf7 100644
--- a/src/core/lib/iomgr/ev_windows.c
+++ b/src/core/lib/iomgr/ev_windows.c
@@ -23,6 +23,6 @@
#include "src/core/lib/debug/trace.h"
grpc_tracer_flag grpc_polling_trace =
- GRPC_TRACER_INITIALIZER(false); /* Disabled by default */
+ GRPC_TRACER_INITIALIZER(false, "polling"); /* Disabled by default */
#endif // GRPC_WINSOCK_SOCKET
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 43f5d0406e..f5875a247e 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -28,7 +28,7 @@
void grpc_iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
- grpc_register_tracer("tcp", &grpc_tcp_trace);
+ grpc_register_tracer(&grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c
index 49d1a03c32..8b1245c640 100644
--- a/src/core/lib/iomgr/iomgr_uv.c
+++ b/src/core/lib/iomgr/iomgr_uv.c
@@ -26,7 +26,7 @@
void grpc_iomgr_platform_init(void) {
grpc_pollset_global_init();
- grpc_register_tracer("tcp", &grpc_tcp_trace);
+ grpc_register_tracer(&grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 1a54065a91..946f0c8c60 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -34,7 +34,8 @@
#include "src/core/lib/debug/trace.h"
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
struct grpc_pollset {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 1bfc2a22a8..ea017a6054 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -31,7 +31,8 @@
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
#ifndef NDEBUG
-grpc_tracer_flag grpc_trace_fd_refcount = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_trace_fd_refcount =
+ GRPC_TRACER_INITIALIZER(false, "fd_refcount");
#endif
gpr_mu grpc_polling_mu;
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index f2cc1be74e..a31d9eef93 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -29,7 +29,8 @@
#include "src/core/lib/iomgr/combiner.h"
-grpc_tracer_flag grpc_resource_quota_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_resource_quota_trace =
+ GRPC_TRACER_INITIALIZER(false, "resource_quota");
#define MEMORY_USAGE_ESTIMATION_MAX 65536
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 5de2b0f4ee..b6dcd15cb0 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -59,7 +59,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
typedef size_t msg_iovlen_type;
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
typedef struct {
grpc_endpoint base;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index ff5fd3edc8..7c6a9b85f5 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -37,7 +37,7 @@
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
typedef struct {
grpc_endpoint base;
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 6704a158ce..2cbb97403b 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -48,7 +48,7 @@
#define GRPC_FIONBIO FIONBIO
#endif
-grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp");
static grpc_error *set_non_block(SOCKET sock) {
int status;
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index e6a9eb0e86..12efce241f 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -41,8 +41,9 @@
#define MIN_QUEUE_WINDOW_DURATION 0.01
#define MAX_QUEUE_WINDOW_DURATION 1
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
+grpc_tracer_flag grpc_timer_check_trace =
+ GRPC_TRACER_INITIALIZER(false, "timer_check");
/* A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
* deadlines earlier than 'queue_deadline" cap are maintained in the heap and
@@ -160,8 +161,8 @@ void grpc_timer_list_init(gpr_timespec now) {
g_shared_mutables.min_timer = timespec_to_atm_round_down(now);
gpr_tls_init(&g_last_seen_min_timer);
gpr_tls_set(&g_last_seen_min_timer, 0);
- grpc_register_tracer("timer", &grpc_timer_trace);
- grpc_register_tracer("timer_check", &grpc_timer_check_trace);
+ grpc_register_tracer(&grpc_timer_trace);
+ grpc_register_tracer(&grpc_timer_check_trace);
for (i = 0; i < NUM_SHARDS; i++) {
timer_shard *shard = &g_shards[i];
diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c
index cb7998db97..b9bea9a2ab 100644
--- a/src/core/lib/iomgr/timer_manager.c
+++ b/src/core/lib/iomgr/timer_manager.c
@@ -56,7 +56,7 @@ static gpr_timespec g_timed_waiter_deadline;
// generation counter to track which thread is waiting for the next timer
static uint64_t g_timed_waiter_generation;
-static void timer_thread(void *unused);
+static void timer_thread(void *completed_thread_ptr);
static void gc_completed_threads(void) {
if (g_completed_threads != NULL) {
@@ -81,10 +81,10 @@ static void start_timer_thread_and_unlock(void) {
if (GRPC_TRACER_ON(grpc_timer_check_trace)) {
gpr_log(GPR_DEBUG, "Spawn timer thread");
}
- gpr_thd_id thd;
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
- gpr_thd_new(&thd, timer_thread, NULL, &opt);
+ completed_thread *ct = gpr_malloc(sizeof(*ct));
+ gpr_thd_new(&ct->t, timer_thread, ct, &opt);
}
void grpc_timer_manager_tick() {
@@ -245,7 +245,7 @@ static void timer_main_loop(grpc_exec_ctx *exec_ctx) {
}
}
-static void timer_thread_cleanup(void) {
+static void timer_thread_cleanup(completed_thread *ct) {
gpr_mu_lock(&g_mu);
// terminate the thread: drop the waiter count, thread count, and let whomever
// stopped the threading stuff know that we're done
@@ -254,8 +254,6 @@ static void timer_thread_cleanup(void) {
if (0 == g_thread_count) {
gpr_cv_signal(&g_cv_shutdown);
}
- completed_thread *ct = gpr_malloc(sizeof(*ct));
- ct->t = gpr_thd_currentid();
ct->next = g_completed_threads;
g_completed_threads = ct;
gpr_mu_unlock(&g_mu);
@@ -264,14 +262,14 @@ static void timer_thread_cleanup(void) {
}
}
-static void timer_thread(void *unused) {
+static void timer_thread(void *completed_thread_ptr) {
// this threads exec_ctx: we try to run things through to completion here
// since it's easy to spin up new threads
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
timer_main_loop(&exec_ctx);
grpc_exec_ctx_finish(&exec_ctx);
- timer_thread_cleanup();
+ timer_thread_cleanup(completed_thread_ptr);
}
static void start_threads(void) {
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 4f204cfbf8..1ab82ef1d5 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -28,8 +28,9 @@
#include <uv.h>
-grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false);
-grpc_tracer_flag grpc_timer_check_trace = GRPC_TRACER_INITIALIZER(false);
+grpc_tracer_flag grpc_timer_trace = GRPC_TRACER_INITIALIZER(false, "timer");
+grpc_tracer_flag grpc_timer_check_trace =
+ GRPC_TRACER_INITIALIZER(false, "timer_check");
static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); }