aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-06-21 17:28:28 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-06-21 17:37:51 -0700
commit3131c269c14f97294ebf8b6e3d1a235d4acf3317 (patch)
tree60d737bcafbd752c2d0c89a2e7b36714ba67dfe5 /src
parentc3a9fae601142ec216dbce60944d6a00ae16db3b (diff)
Integrate with unified error reporting
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c116
1 files changed, 88 insertions, 28 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 7cc69c876d..d625b096a1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -646,11 +646,18 @@ polling_island *polling_island_merge(polling_island *p, polling_island *q) {
return q;
}
-static void polling_island_global_init() {
+static grpc_error *polling_island_global_init() {
+ grpc_error *error = GRPC_ERROR_NONE;
+
gpr_mu_init(&g_pi_freelist_mu);
g_pi_freelist = NULL;
- grpc_wakeup_fd_init(&polling_island_wakeup_fd);
- grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
+
+ error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
+ if (error == GRPC_ERROR_NONE) {
+ error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
+ }
+
+ return error;
}
static void polling_island_global_shutdown() {
@@ -870,21 +877,33 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
gpr_mu_unlock(&fd->pi_mu);
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
}
+static grpc_error *fd_shutdown_error(bool shutdown) {
+ if (!shutdown) {
+ return GRPC_ERROR_NONE;
+ } else {
+ return GRPC_ERROR_CREATE("FD shutdown");
+ }
+}
+
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
- if (*st == CLOSURE_NOT_READY) {
+ if (fd->shutdown) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
+ NULL);
+ } else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
+ NULL);
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -906,7 +925,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -964,11 +983,11 @@ static void sig_handler(int sig_num) {
static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
/* Global state management */
-static void pollset_global_init(void) {
- grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
+ return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
@@ -977,8 +996,13 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_worker);
}
-static void pollset_worker_kick(grpc_pollset_worker *worker) {
- pthread_kill(worker->pt_id, grpc_wakeup_signal);
+static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
+ grpc_error *err = GRPC_ERROR_NONE;
+ int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
+ if (err_num != 0) {
+ err = GRPC_OS_ERROR(err_num, "pthread_kill");
+ }
+ return err;
}
/* Return 1 if the pollset has active threads in pollset_work (pollset must
@@ -1014,10 +1038,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next->prev = worker;
}
+static void kick_append_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("Kick Failure");
+ }
+ *composite = grpc_error_add_child(*composite, error);
+}
+
/* p->mu must be held before calling this function */
-static void pollset_kick(grpc_pollset *p,
- grpc_pollset_worker *specific_worker) {
+static grpc_error *pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
GPR_TIMER_BEGIN("pollset_kick", 0);
+ grpc_error *error = GRPC_ERROR_NONE;
grpc_pollset_worker *worker = specific_worker;
if (worker != NULL) {
@@ -1027,7 +1060,7 @@ static void pollset_kick(grpc_pollset *p,
for (worker = p->root_worker.next; worker != &p->root_worker;
worker = worker->next) {
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- pollset_worker_kick(worker);
+ kick_append_error(&error, pollset_worker_kick(worker));
}
}
} else {
@@ -1037,7 +1070,7 @@ static void pollset_kick(grpc_pollset *p,
} else {
GPR_TIMER_MARK("kicked_specifically", 0);
if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- pollset_worker_kick(worker);
+ kick_append_error(&error, pollset_worker_kick(worker));
}
}
} else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
@@ -1053,7 +1086,7 @@ static void pollset_kick(grpc_pollset *p,
if (worker != NULL) {
GPR_TIMER_MARK("finally_kick", 0);
push_back_worker(p, worker);
- pollset_worker_kick(worker);
+ kick_append_error(&error, pollset_worker_kick(worker));
} else {
GPR_TIMER_MARK("kicked_no_pollers", 0);
p->kicked_without_pollers = true;
@@ -1061,9 +1094,13 @@ static void pollset_kick(grpc_pollset *p,
}
GPR_TIMER_END("pollset_kick", 0);
+ GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
+ return error;
}
-static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+static grpc_error *kick_poller(void) {
+ return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
@@ -1139,7 +1176,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->polling_island to NULL */
pollset_release_polling_island(pollset, "ps_shutdown");
- grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
}
/* pollset->mu lock must be held by the caller before calling this */
@@ -1181,14 +1218,23 @@ static void pollset_reset(grpc_pollset *pollset) {
pollset_release_polling_island(pollset, "ps_reset");
}
+static void work_combine_error(grpc_error **composite, grpc_error *error) {
+ if (error == GRPC_ERROR_NONE) return;
+ if (*composite == GRPC_ERROR_NONE) {
+ *composite = GRPC_ERROR_CREATE("pollset_work");
+ }
+ *composite = grpc_error_add_child(*composite, error);
+}
+
#define GRPC_EPOLL_MAX_EVENTS 1000
-static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, int timeout_ms,
- sigset_t *sig_mask) {
+static grpc_error *pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ int timeout_ms, sigset_t *sig_mask) {
struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1;
int ep_rv;
polling_island *pi = NULL;
+ grpc_error *error = GRPC_ERROR_NONE;
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
@@ -1232,6 +1278,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
if (ep_rv < 0) {
if (errno != EINTR) {
gpr_log(GPR_ERROR, "epoll_pwait() failed: %s", strerror(errno));
+ work_combine_error(&error, GRPC_OS_ERROR(errno, "epoll_pwait"));
} else {
/* We were interrupted. Save an interation by doing a zero timeout
epoll_wait to see if there are any other events of interest */
@@ -1247,7 +1294,8 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
if (data_ptr == &grpc_global_wakeup_fd) {
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ work_combine_error(
+ &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
} else if (data_ptr == &polling_island_wakeup_fd) {
/* This means that our polling island is merged with a different
island. We do not have to do anything here since the subsequent call
@@ -1278,16 +1326,18 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
PI_UNREF(pi, "ps_work");
GPR_TIMER_END("pollset_work_and_unlock", 0);
+ return error;
}
/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl, gpr_timespec now,
- gpr_timespec deadline) {
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl,
+ gpr_timespec now, gpr_timespec deadline) {
GPR_TIMER_BEGIN("pollset_work", 0);
+ grpc_error *error = GRPC_ERROR_NONE;
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
sigset_t new_mask;
@@ -1316,7 +1366,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
push_front_worker(pollset, &worker);
- pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
+ error = pollset_work_and_unlock(exec_ctx, pollset, timeout_ms, &orig_mask);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1345,6 +1395,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
GPR_TIMER_END("pollset_work", 0);
+ GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
+ return error;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -1659,8 +1711,16 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) {
}
fd_global_init();
- pollset_global_init();
- polling_island_global_init();
+
+ if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
+ return NULL;
+ }
+
+ if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
+ polling_island_global_init())) {
+ return NULL;
+ }
+
return &vtable;
}