aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-20 08:28:28 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-20 08:28:28 -0700
commit531b251abcb6085a4174748376edc048742dd35b (patch)
treec135c91311c795bd066f626994cf88da6e08be4f /src/core/lib/iomgr
parent2c48148ebdcdfb62bc24c387f29d986606dc1bcd (diff)
parenteb44079fcd452e2f94066ffe0944c1390987b45e (diff)
Merge github.com:grpc/grpc into flowctlN
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c108
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c76
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c64
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c102
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h2
-rw-r--r--src/core/lib/iomgr/executor.c69
-rw-r--r--src/core/lib/iomgr/iomgr.c2
-rw-r--r--src/core/lib/iomgr/is_epollexclusive_available.c12
-rw-r--r--src/core/lib/iomgr/pollset.h2
-rw-r--r--src/core/lib/iomgr/pollset_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_windows.c6
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.c4
-rw-r--r--src/core/lib/iomgr/socket_mutator.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c4
-rw-r--r--src/core/lib/iomgr/timer_generic.c5
16 files changed, 258 insertions, 208 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 0d57833d85..3ac12ab56f 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -280,8 +280,9 @@ static grpc_fd *fd_create(int fd, const char *name) {
#endif
gpr_free(fd_name);
- struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
- .data.ptr = new_fd};
+ struct epoll_event ev;
+ ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
+ ev.data.ptr = new_fd;
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -435,8 +436,9 @@ static grpc_error *pollset_global_init(void) {
global_wakeup_fd.read_fd = -1;
grpc_error *err = grpc_wakeup_fd_init(&global_wakeup_fd);
if (err != GRPC_ERROR_NONE) return err;
- struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
- .data.ptr = &global_wakeup_fd};
+ struct epoll_event ev;
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = &global_wakeup_fd;
if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
&ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
@@ -502,22 +504,27 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
+static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
GPR_TIMER_BEGIN("pollset_kick_all", 0);
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
switch (worker->state) {
case KICKED:
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
break;
case UNKICKED:
SET_KICK_STATE(worker, KICKED);
if (worker->initialized_cv) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
gpr_cv_signal(&worker->cv);
}
break;
case DESIGNATED_POLLER:
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
SET_KICK_STATE(worker, KICKED);
append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
"pollset_kick_all");
@@ -550,7 +557,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
pollset->shutting_down = true;
- GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
+ GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
GPR_TIMER_END("pollset_shutdown", 0);
}
@@ -567,7 +574,10 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
static const gpr_timespec round_up = {
- .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
+ 0, /* tv_sec */
+ GPR_NS_PER_MS - 1, /* tv_nsec */
+ GPR_TIMESPAN /* clock_type */
+ };
timeout = gpr_time_sub(deadline, now);
int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
return millis >= 1 ? millis : 1;
@@ -646,6 +656,8 @@ static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+ GRPC_STATS_INC_POLL_EVENTS_RETURNED(exec_ctx, r);
+
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
}
@@ -782,7 +794,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
static bool check_neighborhood_for_available_poller(
- pollset_neighborhood *neighborhood) {
+ grpc_exec_ctx *exec_ctx, pollset_neighborhood *neighborhood) {
GPR_TIMER_BEGIN("check_neighborhood_for_available_poller", 0);
bool found_worker = false;
do {
@@ -806,6 +818,7 @@ static bool check_neighborhood_for_available_poller(
SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
GPR_TIMER_MARK("signal worker", 0);
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
gpr_cv_signal(&inspect_worker->cv);
}
} else {
@@ -865,6 +878,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
gpr_cv_signal(&worker->next->cv);
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->mu);
@@ -883,7 +897,8 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_neighborhoods[(poller_neighborhood_idx + i) %
g_num_neighborhoods];
if (gpr_mu_trylock(&neighborhood->mu)) {
- found_worker = check_neighborhood_for_available_poller(neighborhood);
+ found_worker =
+ check_neighborhood_for_available_poller(exec_ctx, neighborhood);
gpr_mu_unlock(&neighborhood->mu);
scan_state[i] = true;
} else {
@@ -896,7 +911,8 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
&g_neighborhoods[(poller_neighborhood_idx + i) %
g_num_neighborhoods];
gpr_mu_lock(&neighborhood->mu);
- found_worker = check_neighborhood_for_available_poller(neighborhood);
+ found_worker =
+ check_neighborhood_for_available_poller(exec_ctx, neighborhood);
gpr_mu_unlock(&neighborhood->mu);
}
grpc_exec_ctx_flush(exec_ctx);
@@ -978,9 +994,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
return error;
}
-static grpc_error *pollset_kick(grpc_pollset *pollset,
+static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
GPR_TIMER_BEGIN("pollset_kick", 0);
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
grpc_error *ret_err = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_strvec log;
@@ -1013,6 +1030,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
if (root_worker == NULL) {
+ GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
pollset->kicked_without_poller = true;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked_without_poller");
@@ -1021,12 +1039,14 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
}
grpc_pollset_worker *next_worker = root_worker->next;
if (root_worker->state == KICKED) {
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
goto done;
} else if (next_worker->state == KICKED) {
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
@@ -1037,6 +1057,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
// there is no next worker
root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
&g_active_poller)) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
}
@@ -1044,6 +1065,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (next_worker->state == UNKICKED) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
}
@@ -1061,10 +1083,12 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
}
SET_KICK_STATE(root_worker, KICKED);
if (root_worker->initialized_cv) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
gpr_cv_signal(&root_worker->cv);
}
goto done;
} else {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
@@ -1074,11 +1098,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
}
} else {
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
GPR_ASSERT(next_worker->state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
goto done;
}
} else {
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked while waking up");
}
@@ -1095,6 +1121,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
}
@@ -1102,6 +1129,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
goto done;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick active poller");
}
@@ -1109,6 +1137,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
goto done;
} else if (specific_worker->initialized_cv) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick waiting worker");
}
@@ -1116,6 +1145,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
gpr_cv_signal(&specific_worker->cv);
goto done;
} else {
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick non-waiting worker");
}
@@ -1172,34 +1202,34 @@ static void shutdown_engine(void) {
}
static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
+ sizeof(grpc_pollset),
+
+ fd_create,
+ fd_wrapped_fd,
+ fd_orphan,
+ fd_shutdown,
+ fd_notify_on_read,
+ fd_notify_on_write,
+ fd_is_shutdown,
+ fd_get_read_notifier_pollset,
+
+ pollset_init,
+ pollset_shutdown,
+ pollset_destroy,
+ pollset_work,
+ pollset_kick,
+ pollset_add_fd,
+
+ pollset_set_create,
+ pollset_set_destroy,
+ pollset_set_add_pollset,
+ pollset_set_del_pollset,
+ pollset_set_add_pollset_set,
+ pollset_set_del_pollset_set,
+ pollset_set_add_fd,
+ pollset_set_del_fd,
+
+ shutdown_engine,
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 51200fc064..8eb4de44d9 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -477,8 +477,9 @@ static grpc_error *pollable_materialize(pollable *p) {
close(new_epfd);
return err;
}
- struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
- .data.ptr = (void *)(1 | (intptr_t)&p->wakeup)};
+ struct epoll_event ev;
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup);
if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
err = GRPC_OS_ERROR(errno, "epoll_ctl");
close(new_epfd);
@@ -507,9 +508,9 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
gpr_mu_unlock(&fd->orphaned_mu);
return GRPC_ERROR_NONE;
}
- struct epoll_event ev_fd = {
- .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE),
- .data.ptr = fd};
+ struct epoll_event ev_fd;
+ ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
+ ev_fd.data.ptr = fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
case EEXIST:
@@ -561,6 +562,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
if (worker->pollable_obj != &pollset->pollable_obj) {
gpr_mu_lock(&worker->pollable_obj->po.mu);
}
@@ -665,9 +667,10 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
}
/* p->po.mu must be held before calling this function */
-static grpc_error *pollset_kick(grpc_pollset *pollset,
+static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
pollable *p = pollset->current_pollable_obj;
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
if (p != &pollset->pollable_obj) {
gpr_mu_lock(&p->po.mu);
}
@@ -706,7 +709,10 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
static const gpr_timespec round_up = {
- .clock_type = GPR_TIMESPAN, .tv_sec = 0, .tv_nsec = GPR_NS_PER_MS - 1};
+ 0, /* tv_sec */
+ GPR_NS_PER_MS - 1, /* tv_nsec */
+ GPR_TIMESPAN /* clock_type */
+ };
timeout = gpr_time_sub(deadline, now);
int millis = gpr_time_to_millis(gpr_time_add(timeout, round_up));
return millis >= 1 ? millis : 1;
@@ -1390,34 +1396,34 @@ static void shutdown_engine(void) {
}
static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
+ sizeof(grpc_pollset),
+
+ fd_create,
+ fd_wrapped_fd,
+ fd_orphan,
+ fd_shutdown,
+ fd_notify_on_read,
+ fd_notify_on_write,
+ fd_is_shutdown,
+ fd_get_read_notifier_pollset,
+
+ pollset_init,
+ pollset_shutdown,
+ pollset_destroy,
+ pollset_work,
+ pollset_kick,
+ pollset_add_fd,
+
+ pollset_set_create,
+ pollset_set_destroy,
+ pollset_set_add_pollset,
+ pollset_set_del_pollset,
+ pollset_set_add_pollset_set,
+ pollset_set_del_pollset_set,
+ pollset_set_add_fd,
+ pollset_set_del_fd,
+
+ shutdown_engine,
};
const grpc_event_engine_vtable *grpc_init_epollex_linux(
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index b88c3ba111..4d8bdf1401 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -1021,10 +1021,11 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
}
/* p->mu must be held before calling this function */
-static grpc_error *pollset_kick(grpc_pollset *p,
+static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
GPR_TIMER_BEGIN("pollset_kick", 0);
grpc_error *error = GRPC_ERROR_NONE;
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
const char *err_desc = "Kick Failure";
grpc_pollset_worker *worker = specific_worker;
if (worker != NULL) {
@@ -1132,7 +1133,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
- grpc_pollset *ps, char *reason) {
+ grpc_pollset *ps,
+ const char *reason) {
if (ps->po.pi != NULL) {
PI_UNREF(exec_ctx, ps->po.pi, reason);
}
@@ -1158,7 +1160,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = true;
pollset->shutdown_done = closure;
- pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
/* If the pollset has any workers, we cannot call finish_shutdown_locked()
because it would release the underlying polling island. In such a case, we
@@ -1670,34 +1672,34 @@ static void shutdown_engine(void) {
}
static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
+ sizeof(grpc_pollset),
+
+ fd_create,
+ fd_wrapped_fd,
+ fd_orphan,
+ fd_shutdown,
+ fd_notify_on_read,
+ fd_notify_on_write,
+ fd_is_shutdown,
+ fd_get_read_notifier_pollset,
+
+ pollset_init,
+ pollset_shutdown,
+ pollset_destroy,
+ pollset_work,
+ pollset_kick,
+ pollset_add_fd,
+
+ pollset_set_create,
+ pollset_set_destroy,
+ pollset_set_add_pollset,
+ pollset_set_del_pollset,
+ pollset_set_add_pollset_set,
+ pollset_set_del_pollset_set,
+ pollset_set_add_fd,
+ pollset_set_del_fd,
+
+ shutdown_engine,
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 7f44eda138..e170702dca 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -209,7 +209,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
/* As per pollset_kick, with an extended set of flags (defined above)
-- mostly for fd_posix's use. */
-static grpc_error *pollset_kick_ext(grpc_pollset *p,
+static grpc_error *pollset_kick_ext(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) GRPC_MUST_USE_RESULT;
@@ -365,36 +365,39 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
return notifier;
}
-static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) {
+static grpc_error *pollset_kick_locked(grpc_exec_ctx *exec_ctx,
+ grpc_fd_watcher *watcher) {
gpr_mu_lock(&watcher->pollset->mu);
GPR_ASSERT(watcher->worker);
- grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker,
- GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ grpc_error *err =
+ pollset_kick_ext(exec_ctx, watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
gpr_mu_unlock(&watcher->pollset->mu);
return err;
}
-static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+static void maybe_wake_one_watcher_locked(grpc_exec_ctx *exec_ctx,
+ grpc_fd *fd) {
if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
- pollset_kick_locked(fd->inactive_watcher_root.next);
+ pollset_kick_locked(exec_ctx, fd->inactive_watcher_root.next);
} else if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
+ pollset_kick_locked(exec_ctx, fd->read_watcher);
} else if (fd->write_watcher) {
- pollset_kick_locked(fd->write_watcher);
+ pollset_kick_locked(exec_ctx, fd->write_watcher);
}
}
-static void wake_all_watchers_locked(grpc_fd *fd) {
+static void wake_all_watchers_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_fd_watcher *watcher;
for (watcher = fd->inactive_watcher_root.next;
watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
- pollset_kick_locked(watcher);
+ pollset_kick_locked(exec_ctx, watcher);
}
if (fd->read_watcher) {
- pollset_kick_locked(fd->read_watcher);
+ pollset_kick_locked(exec_ctx, fd->read_watcher);
}
if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
- pollset_kick_locked(fd->write_watcher);
+ pollset_kick_locked(exec_ctx, fd->write_watcher);
}
}
@@ -435,7 +438,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
if (!has_watchers(fd)) {
close_fd_locked(exec_ctx, fd);
} else {
- wake_all_watchers_locked(fd);
+ wake_all_watchers_locked(exec_ctx, fd);
}
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* drop the reference */
@@ -479,7 +482,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd));
- maybe_wake_one_watcher_locked(fd);
+ maybe_wake_one_watcher_locked(exec_ctx, fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -648,7 +651,7 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
}
}
if (kick) {
- maybe_wake_one_watcher_locked(fd);
+ maybe_wake_one_watcher_locked(exec_ctx, fd);
}
if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
close_fd_locked(exec_ctx, fd);
@@ -712,11 +715,12 @@ static void kick_append_error(grpc_error **composite, grpc_error *error) {
*composite = grpc_error_add_child(*composite, error);
}
-static grpc_error *pollset_kick_ext(grpc_pollset *p,
+static grpc_error *pollset_kick_ext(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
grpc_pollset_worker *specific_worker,
uint32_t flags) {
GPR_TIMER_BEGIN("pollset_kick_ext", 0);
grpc_error *error = GRPC_ERROR_NONE;
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
/* pollset->mu already held */
if (specific_worker != NULL) {
@@ -782,9 +786,9 @@ static grpc_error *pollset_kick_ext(grpc_pollset *p,
return error;
}
-static grpc_error *pollset_kick(grpc_pollset *p,
+static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
- return pollset_kick_ext(p, specific_worker, 0);
+ return pollset_kick_ext(exec_ctx, p, specific_worker, 0);
}
/* global state management */
@@ -847,7 +851,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
pollset->fds[pollset->fd_count++] = fd;
GRPC_FD_REF(fd, "multipoller");
- pollset_kick(pollset, NULL);
+ pollset_kick(exec_ctx, pollset, NULL);
exit:
gpr_mu_unlock(&pollset->mu);
}
@@ -1083,7 +1087,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* check shutdown conditions */
if (pollset->shutting_down) {
if (pollset_has_workers(pollset)) {
- pollset_kick(pollset, NULL);
+ pollset_kick(exec_ctx, pollset, NULL);
} else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
@@ -1112,7 +1116,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_ASSERT(!pollset->shutting_down);
pollset->shutting_down = 1;
pollset->shutdown_done = closure;
- pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
}
@@ -1688,34 +1692,34 @@ static void shutdown_engine(void) {
}
static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
+ sizeof(grpc_pollset),
+
+ fd_create,
+ fd_wrapped_fd,
+ fd_orphan,
+ fd_shutdown,
+ fd_notify_on_read,
+ fd_notify_on_write,
+ fd_is_shutdown,
+ fd_get_read_notifier_pollset,
+
+ pollset_init,
+ pollset_shutdown,
+ pollset_destroy,
+ pollset_work,
+ pollset_kick,
+ pollset_add_fd,
+
+ pollset_set_create,
+ pollset_set_destroy,
+ pollset_set_add_pollset,
+ pollset_set_del_pollset,
+ pollset_set_add_pollset_set,
+ pollset_set_del_pollset_set,
+ pollset_set_add_fd,
+ pollset_set_del_fd,
+
+ shutdown_engine,
};
const grpc_event_engine_vtable *grpc_init_poll_posix(bool explicit_request) {
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index d881e2d4dd..4d3ae2228e 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -210,9 +210,9 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline);
}
-grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- return g_event_engine->pollset_kick(pollset, specific_worker);
+ return g_event_engine->pollset_kick(exec_ctx, pollset, specific_worker);
}
void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1108e46ef8..1ff2ff1413 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -54,7 +54,7 @@ typedef struct grpc_event_engine_vtable {
grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker, gpr_timespec now,
gpr_timespec deadline);
- grpc_error *(*pollset_kick)(grpc_pollset *pollset,
+ grpc_error *(*pollset_kick)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker);
void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
struct grpc_fd *fd);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 892385d7d7..2439f15a8a 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -32,16 +32,14 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/support/spinlock.h"
-#define MAX_DEPTH 2
-
typedef struct {
gpr_mu mu;
gpr_cv cv;
grpc_closure_list elems;
- size_t depth;
bool shutdown;
bool queued_long_job;
gpr_thd_id id;
+ grpc_closure_list local_elems;
} thread_state;
static thread_state *g_thread_state;
@@ -56,32 +54,35 @@ static grpc_tracer_flag executor_trace =
static void executor_thread(void *arg);
-static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
- size_t n = 0;
+static void run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ int n = 0; // number of closures executed
- grpc_closure *c = list.head;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- if (GRPC_TRACER_ON(executor_trace)) {
+ while (!grpc_closure_list_empty(*list)) {
+ grpc_closure *c = list->head;
+ grpc_closure_list_init(list);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ if (GRPC_TRACER_ON(executor_trace)) {
#ifndef NDEBUG
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
- c->file_created, c->line_created);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c,
+ c->file_created, c->line_created);
#else
- gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
+ gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c);
#endif
- }
+ }
#ifndef NDEBUG
- c->scheduled = false;
+ c->scheduled = false;
#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- n++;
- grpc_exec_ctx_flush(exec_ctx);
+ n++;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ grpc_exec_ctx_flush(exec_ctx);
+ }
}
- return n;
+ GRPC_STATS_INC_EXECUTOR_CLOSURES_PER_WAKEUP(exec_ctx, n);
}
bool grpc_executor_is_threaded() {
@@ -126,7 +127,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) {
for (size_t i = 0; i < g_max_threads; i++) {
gpr_mu_destroy(&g_thread_state[i].mu);
gpr_cv_destroy(&g_thread_state[i].cv);
- run_closures(exec_ctx, g_thread_state[i].elems);
+ run_closures(exec_ctx, &g_thread_state[i].elems);
}
gpr_free(g_thread_state);
gpr_tls_destroy(&g_this_thread_state);
@@ -150,14 +151,14 @@ static void executor_thread(void *arg) {
grpc_exec_ctx exec_ctx =
GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL);
- size_t subtract_depth = 0;
+ GRPC_STATS_INC_EXECUTOR_THREADS_CREATED(&exec_ctx);
+
+ bool used = false;
for (;;) {
if (GRPC_TRACER_ON(executor_trace)) {
- gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")",
- (int)(ts - g_thread_state), subtract_depth);
+ gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step", (int)(ts - g_thread_state));
}
gpr_mu_lock(&ts->mu);
- ts->depth -= subtract_depth;
while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
ts->queued_long_job = false;
gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -170,15 +171,20 @@ static void executor_thread(void *arg) {
gpr_mu_unlock(&ts->mu);
break;
}
+ if (!used) {
+ GRPC_STATS_INC_EXECUTOR_THREADS_USED(&exec_ctx);
+ used = true;
+ }
GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED(&exec_ctx);
- grpc_closure_list exec = ts->elems;
+ GPR_ASSERT(grpc_closure_list_empty(ts->local_elems));
+ ts->local_elems = ts->elems;
ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
gpr_mu_unlock(&ts->mu);
if (GRPC_TRACER_ON(executor_trace)) {
gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state));
}
- subtract_depth = run_closures(&exec_ctx, exec);
+ run_closures(&exec_ctx, &ts->local_elems);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -211,6 +217,10 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)];
} else {
GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx);
+ if (is_short) {
+ grpc_closure_list_append(&ts->local_elems, closure, error);
+ return;
+ }
}
thread_state *orig_ts = ts;
@@ -250,8 +260,7 @@ static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_cv_signal(&ts->cv);
}
grpc_closure_list_append(&ts->elems, closure, error);
- ts->depth++;
- try_new_thread = ts->depth > MAX_DEPTH &&
+ try_new_thread = ts->elems.head != closure &&
cur_thread_count < g_max_threads && !ts->shutdown;
if (!is_short) ts->queued_long_job = true;
gpr_mu_unlock(&ts->mu);
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 1feea6d628..f63f190155 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -50,7 +50,7 @@ void grpc_iomgr_init(grpc_exec_ctx *exec_ctx) {
grpc_executor_init(exec_ctx);
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
- g_root_object.name = "root";
+ g_root_object.name = (char *)"root";
grpc_network_status_init();
grpc_iomgr_platform_init();
}
diff --git a/src/core/lib/iomgr/is_epollexclusive_available.c b/src/core/lib/iomgr/is_epollexclusive_available.c
index e8a7d4d52c..d08844c0df 100644
--- a/src/core/lib/iomgr/is_epollexclusive_available.c
+++ b/src/core/lib/iomgr/is_epollexclusive_available.c
@@ -57,12 +57,12 @@ bool grpc_is_epollexclusive_available(void) {
close(fd);
return false;
}
- struct epoll_event ev = {
- /* choose events that should cause an error on
- EPOLLEXCLUSIVE enabled kernels - specifically the combination of
- EPOLLONESHOT and EPOLLEXCLUSIVE */
- .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE | EPOLLONESHOT),
- .data.ptr = NULL};
+ struct epoll_event ev;
+ /* choose events that should cause an error on
+ EPOLLEXCLUSIVE enabled kernels - specifically the combination of
+ EPOLLONESHOT and EPOLLEXCLUSIVE */
+ ev.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE | EPOLLONESHOT);
+ ev.data.ptr = NULL;
if (epoll_ctl(fd, EPOLL_CTL_ADD, evfd, &ev) != 0) {
if (errno != EINVAL) {
if (!logged_why_not) {
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index a609a3877a..a0f6b3a9d3 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -76,7 +76,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* Break one polling thread out of polling work for this pollset.
If specific_worker is non-NULL, then kick that worker. */
-grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker)
GRPC_MUST_USE_RESULT;
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index a79fe89d3e..2651325e25 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -145,7 +145,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
+grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
GRPC_UV_ASSERT_SAME_THREAD();
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index ea017a6054..eb295d3eeb 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -98,7 +98,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
pollset->shutting_down = 1;
- grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ grpc_pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
@@ -181,7 +181,7 @@ done:
return GRPC_ERROR_NONE;
}
-grpc_error *grpc_pollset_kick(grpc_pollset *p,
+grpc_error *grpc_pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
grpc_pollset_worker *specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
@@ -209,7 +209,7 @@ grpc_error *grpc_pollset_kick(grpc_pollset *p,
specific_worker =
pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
if (specific_worker != NULL) {
- grpc_pollset_kick(p, specific_worker);
+ grpc_pollset_kick(exec_ctx, p, specific_worker);
} else if (p->is_iocp_worker) {
grpc_iocp_kick();
} else {
diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c
index c81566575e..8e907703ae 100644
--- a/src/core/lib/iomgr/socket_factory_posix.c
+++ b/src/core/lib/iomgr/socket_factory_posix.c
@@ -85,8 +85,8 @@ static const grpc_arg_pointer_vtable socket_factory_arg_vtable = {
socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp};
grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) {
- return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_FACTORY, factory,
- &socket_factory_arg_vtable);
+ return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SOCKET_FACTORY,
+ factory, &socket_factory_arg_vtable);
}
#endif
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
index 300ac75b38..b0435d5a07 100644
--- a/src/core/lib/iomgr/socket_mutator.c
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -76,6 +76,6 @@ static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = {
socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp};
grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) {
- return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_MUTATOR, mutator,
- &socket_mutator_arg_vtable);
+ return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SOCKET_MUTATOR,
+ mutator, &socket_mutator_arg_vtable);
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index c3ec3e447a..06612d639c 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -198,12 +198,12 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)arg;
-
+ grpc_pollset *read_notifier_pollset;
if (err != GRPC_ERROR_NONE) {
goto error;
}
- grpc_pollset *read_notifier_pollset =
+ read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count];
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index c08bb525b7..e9a7236c8c 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -95,9 +95,7 @@ struct shared_mutables {
gpr_mu mu;
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
-static struct shared_mutables g_shared_mutables = {
- .checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER, .initialized = false,
-};
+static struct shared_mutables g_shared_mutables;
static gpr_clock_type g_clock_type;
static gpr_timespec g_start_time;
@@ -155,6 +153,7 @@ void grpc_timer_list_init(gpr_timespec now) {
uint32_t i;
g_shared_mutables.initialized = true;
+ g_shared_mutables.checker_mu = GPR_SPINLOCK_INITIALIZER;
gpr_mu_init(&g_shared_mutables.mu);
g_clock_type = now.clock_type;
g_start_time = now;