From 42b004a2a5f785094f6c9bccaf4090e2c7c6e9b5 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Fri, 8 Apr 2016 14:41:49 -0700 Subject: first cut of changes --- src/core/lib/iomgr/ev_posix.h | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..4cfa83e6a2 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -55,6 +55,8 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -137,6 +139,10 @@ void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); +/* Return the read notifier pollset from the fd */ +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd); + /* pollset_posix functions */ /* Add an fd to a pollset */ -- cgit v1.2.3 From 5e28d71f3de6e4edc72b703e07b43709d8cc783f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 12 Apr 2016 10:45:07 -0700 Subject: fix formatting --- src/core/lib/iomgr/ev_posix.c | 3 ++- src/core/lib/iomgr/ev_posix.h | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index af4126c900..8c6ec90684 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -83,7 +83,8 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, g_event_engine->fd_notify_on_write(exec_ctx, fd, closure); } -grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, + grpc_fd *fd) { return g_event_engine->fd_get_read_notifier_pollset(exec_ctx, fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 4cfa83e6a2..344bf63438 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,7 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, - grpc_fd *fd); + grpc_fd *fd); void (*pollset_init)(grpc_pollset *pollset, gpr_mu **mu); void (*pollset_shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -141,7 +141,7 @@ void grpc_fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, /* Return the read notifier pollset from the fd */ grpc_pollset *grpc_fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, - grpc_fd *fd); + grpc_fd *fd); /* pollset_posix functions */ -- cgit v1.2.3 From 4f1d0f337b15748d171ec21dddba1f6d80e94778 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 May 2016 17:12:37 -0700 Subject: Error reporting progress --- src/core/lib/iomgr/error.c | 10 ++ src/core/lib/iomgr/error.h | 6 + src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 131 ++++++++++++++------- src/core/lib/iomgr/ev_posix.c | 16 +-- src/core/lib/iomgr/ev_posix.h | 12 +- src/core/lib/iomgr/pollset.h | 11 +- src/core/lib/iomgr/tcp_client_posix.c | 15 ++- src/core/lib/iomgr/wakeup_fd_pipe.c | 22 ++-- src/core/lib/iomgr/wakeup_fd_posix.c | 12 +- src/core/lib/iomgr/wakeup_fd_posix.h | 15 ++- src/core/lib/iomgr/workqueue.h | 10 +- src/core/lib/iomgr/workqueue_posix.c | 62 ++++++---- .../google_default/google_default_credentials.c | 13 +- src/core/lib/surface/completion_queue.c | 38 +++++- test/core/iomgr/endpoint_tests.c | 10 +- test/core/security/oauth2_utils.c | 12 +- 16 files changed, 266 insertions(+), 129 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index d5587a4194..bc50d6870f 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -469,3 +469,13 @@ grpc_error *grpc_os_error(const char *file, int line, int err, GRPC_ERROR_STR_OS_ERROR, strerror(err)), GRPC_ERROR_STR_SYSCALL, call_name); } + +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line) { + if (error == GRPC_ERROR_NONE) return true; + const char *msg = grpc_error_string(error); + gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + return false; +} diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 98661a6a2c..2b56aede75 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_ERROR_H #define GRPC_CORE_LIB_IOMGR_ERROR_H +#include #include #include @@ -114,4 +115,9 @@ grpc_error *grpc_os_error(const char *file, int line, int err, #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line); +#define GRPC_LOG_IF_ERROR(what, error) \ + grpc_log_if_error((what), (error), __FILE__, __LINE__) + #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index d11c947df2..288a44103b 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -217,9 +217,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -726,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -739,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); @@ -750,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -778,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -787,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p, } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -810,7 +826,9 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_global_destroy(); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -876,11 +894,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -896,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -933,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -955,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker.reevaluate_polling_on_wakeup) { + if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work || worker.kicked_specifically) { @@ -996,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1156,11 +1179,19 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1210,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0); @@ -1221,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1237,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1297,9 +1332,11 @@ exit: } } -static void multipoll_with_poll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1374,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } } else { if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { @@ -1391,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_free(pfds); gpr_free(watchers); + return error; } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -1934,14 +1974,23 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { + const char *msg; + grpc_error *err = GRPC_ERROR_NONE; #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; #endif fd_global_init(); - pollset_global_init(); + err = pollset_global_init(); + if (err != GRPC_ERROR_NONE) goto error; return &vtable; + +error: + msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + return NULL; } #endif diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 7df1751352..3b965c8b8c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -101,15 +101,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { g_event_engine->pollset_destroy(pollset); } -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); } -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { - g_event_engine->pollset_kick(pollset, specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + return g_event_engine->pollset_kick(pollset, specific_worker); } void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -159,6 +159,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -void grpc_kick_poller(void) { g_event_engine->kick_poller(); } +grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } #endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..c9f9fd9957 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -61,11 +61,11 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_reset)(grpc_pollset *pollset); void (*pollset_destroy)(grpc_pollset *pollset); - void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); - void (*pollset_kick)(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); + grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + grpc_error *(*pollset_kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); @@ -88,7 +88,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); - void (*kick_poller)(void); + grpc_error *(*kick_poller)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c40a474877..8d9edc8406 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May call grpc_closure_list_run on grpc_closure_list, without holding the pollset lock */ -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. Otherwise, if specific_worker is non-NULL, then kick that worker. */ -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) + GRPC_MUST_USE_RESULT; #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index f5a3fb4aa3..c70f11b40a 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -76,13 +76,16 @@ static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { GPR_ASSERT(fd >= 0); - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) || - !grpc_set_socket_no_sigpipe_if_possible(fd)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); - goto error; + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; goto done; error: diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index e9b9a0119f..4e5dbdcb73 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" -static void pipe_init(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); @@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); abort(); } - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + grpc_error* err; + err = grpc_set_socket_nonblocking(pipefd[0], 1); + if (err != GRPC_ERROR_NONE) return err; + err = grpc_set_socket_nonblocking(pipefd[1], 1); + if (err != GRPC_ERROR_NONE) return err; fd_info->read_fd = pipefd[0]; fd_info->write_fd = pipefd[1]; + return GRPC_ERROR_NONE; } -static void pipe_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) { char buf[128]; ssize_t r; for (;;) { r = read(fd_info->read_fd, buf, sizeof(buf)); if (r > 0) continue; - if (r == 0) return; + if (r == 0) return GRPC_ERROR_NONE; switch (errno) { case EAGAIN: - return; + return GRPC_ERROR_NONE; case EINTR: continue; default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; + return GRPC_OS_ERROR(errno, "read"); } } } -static void pipe_wakeup(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) { char c = 0; while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) ; + return GRPC_ERROR_NONE; } static void pipe_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 525369c356..046208abc8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) { void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->init(fd_info); +grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->init(fd_info); } -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->consume(fd_info); +grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->consume(fd_info); } -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->wakeup(fd_info); +grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 6b069c1837..e269f242d8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -62,6 +62,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H #define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H +#include "src/core/lib/iomgr/error.h" + void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void); typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { - void (*init)(grpc_wakeup_fd* fd_info); - void (*consume)(grpc_wakeup_fd* fd_info); - void (*wakeup)(grpc_wakeup_fd* fd_info); + grpc_error* (*init)(grpc_wakeup_fd* fd_info); + grpc_error* (*consume)(grpc_wakeup_fd* fd_info); + grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info); void (*destroy)(grpc_wakeup_fd* fd_info); /* Must be called before calling any other functions */ int (*check_availability)(void); @@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) -void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info); +grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info) + GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); /* Defined in some specialized implementation's .c file, or by diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 20b47e1c14..cb6aed9bd4 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,9 +50,11 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); +grpc_error *grpc_workqueue_flush( + grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) GRPC_MUST_USE_RESULT; #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -77,7 +79,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error); +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index e27525dd50..1187e627a2 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -47,20 +47,25 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue) { char name[32]; - grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&workqueue->refs, 1); - gpr_mu_init(&workqueue->mu); - workqueue->closure_list.head = workqueue->closure_list.tail = NULL; - grpc_wakeup_fd_init(&workqueue->wakeup_fd); - sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); - grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - return workqueue; + *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&(*workqueue)->refs, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); + if (err != GRPC_ERROR_NONE) { + gpr_free(*workqueue); + return err; + } + sprintf(name, "workqueue:%p", (void *)(*workqueue)); + (*workqueue)->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, workqueue); + grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, + &(*workqueue)->read_closure); + return GRPC_ERROR_NONE; } static void workqueue_destroy(grpc_exec_ctx *exec_ctx, @@ -101,13 +106,16 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); } -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { +grpc_error *grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) { + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); + return error; } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -123,20 +131,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); + if (error == GRPC_ERROR_NONE) { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error) { +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + } gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index a67f19ae0f..019def95c2 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -88,7 +88,7 @@ static void on_compute_engine_detection_http_response(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(g_polling_mu); detector->is_done = 1; - grpc_pollset_kick(detector->pollset, NULL); + GRPC_LOG_IF_ERROR("Pollset kick", grpc_pollset_kick(detector->pollset, NULL)); gpr_mu_unlock(g_polling_mu); } @@ -131,9 +131,14 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_lock(g_polling_mu); while (!detector.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, detector.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + detector.is_done = 1; + detector.success = 0; + } } gpr_mu_unlock(g_polling_mu); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 50e2a1bd16..5bb9390288 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -263,8 +263,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + grpc_error *kick_error = + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(kick_error); + } } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -343,8 +350,18 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); @@ -460,8 +477,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + del_plucker(cc, tag, &worker); + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } del_plucker(cc, tag, &worker); } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index ded76bccec..8eec10414d 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -137,7 +137,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(g_mu); state->read_done = 1 + (error == GRPC_ERROR_NONE); - grpc_pollset_kick(g_pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, @@ -172,7 +172,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(g_mu); state->write_done = 1 + (error != GRPC_ERROR_NONE); - grpc_pollset_kick(g_pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } @@ -237,8 +237,10 @@ static void read_and_write_test(grpc_endpoint_test_config config, while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); } gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index e55ce5c7b4..c92850d681 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -70,7 +70,7 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data, gpr_mu_lock(request->mu); request->is_done = 1; request->token = token; - grpc_pollset_kick(request->pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(request->pollset, NULL)); gpr_mu_unlock(request->mu); } @@ -99,9 +99,13 @@ char *grpc_test_fetch_oauth2_token_with_credentials( gpr_mu_lock(request.mu); while (!request.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, request.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, request.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + request.is_done = 1; + } } gpr_mu_unlock(request.mu); -- cgit v1.2.3 From 52f231212f0385bae40dd6e45ed1c4bf823bf3c2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 15 Jun 2016 09:34:14 -0700 Subject: Make {endpoint,fd}_shutdown idempotent --- src/core/lib/iomgr/endpoint.h | 2 +- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 24 +++++++++++++++++----- src/core/lib/iomgr/ev_poll_posix.c | 24 +++++++++++++++++----- src/core/lib/iomgr/ev_posix.c | 4 ++++ src/core/lib/iomgr/ev_posix.h | 6 +++++- src/core/lib/iomgr/tcp_posix.c | 2 +- test/core/iomgr/endpoint_tests.c | 30 ++++++++++++++++------------ test/core/iomgr/tcp_posix_test.c | 2 +- 8 files changed, 67 insertions(+), 27 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 3877ceb1e2..f9808bbda1 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -82,7 +82,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); -/* Causes any pending read/write callbacks to run immediately with +/* Causes any pending and future read/write callbacks to run immediately with success==0 */ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 943c404f91..ef8bbd6277 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -515,7 +515,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { - if (*st == CLOSURE_NOT_READY) { + if (fd->shutdown) { + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { @@ -557,13 +559,24 @@ static void set_read_notifier_pollset_locked( static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); - GPR_ASSERT(!fd->shutdown); - fd->shutdown = 1; - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + /* only shutdown once */ + if (!fd->shutdown) { + fd->shutdown = 1; + /* signal read/write closed to OS so that future operations fail */ + shutdown(fd->fd, SHUT_RDWR); + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + } gpr_mu_unlock(&fd->mu); } +static bool fd_is_shutdown(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + bool r = fd->shutdown; + gpr_mu_unlock(&fd->mu); + return r; +} + static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); @@ -1938,6 +1951,7 @@ static const grpc_event_engine_vtable vtable = { .fd_wrapped_fd = fd_wrapped_fd, .fd_orphan = fd_orphan, .fd_shutdown = fd_shutdown, + .fd_is_shutdown = fd_is_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 0167999dad..6313032869 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -421,7 +421,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); } static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { - if (*st == CLOSURE_NOT_READY) { + if (fd->shutdown) { + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ *st = closure; } else if (*st == CLOSURE_READY) { @@ -463,13 +465,24 @@ static void set_read_notifier_pollset_locked( static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_lock(&fd->mu); - GPR_ASSERT(!fd->shutdown); - fd->shutdown = 1; - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + /* only shutdown once */ + if (!fd->shutdown) { + fd->shutdown = 1; + /* signal read/write closed to OS so that future operations fail */ + shutdown(fd->fd, SHUT_RDWR); + set_ready_locked(exec_ctx, fd, &fd->read_closure); + set_ready_locked(exec_ctx, fd, &fd->write_closure); + } gpr_mu_unlock(&fd->mu); } +static bool fd_is_shutdown(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + bool r = fd->shutdown; + gpr_mu_unlock(&fd->mu); + return r; +} + static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { gpr_mu_lock(&fd->mu); @@ -1172,6 +1185,7 @@ static const grpc_event_engine_vtable vtable = { .fd_wrapped_fd = fd_wrapped_fd, .fd_orphan = fd_orphan, .fd_shutdown = fd_shutdown, + .fd_is_shutdown = fd_is_shutdown, .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 6477b05dcd..a17ee17945 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -153,6 +153,10 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { g_event_engine->fd_shutdown(exec_ctx, fd); } +bool grpc_fd_is_shutdown(grpc_fd *fd) { + return g_event_engine->fd_is_shutdown(fd); +} + void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { g_event_engine->fd_notify_on_read(exec_ctx, fd, closure); diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 344bf63438..53457aa9d5 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -55,6 +55,7 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); + bool (*fd_is_shutdown)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -116,7 +117,10 @@ int grpc_fd_wrapped_fd(grpc_fd *fd); void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, const char *reason); -/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ +/* Has grpc_fd_shutdown been called on an fd? */ +bool grpc_fd_is_shutdown(grpc_fd *fd); + +/* Cause any current and future callbacks to fail. */ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd); /* Register read interest, causing read_cb to be called once when fd becomes diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index e2869224f1..1bfc190f7a 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -408,7 +408,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); + grpc_exec_ctx_enqueue(exec_ctx, cb, !grpc_fd_is_shutdown(tcp->em_fd), NULL); return; } tcp->outgoing_buffer = buf; diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index b9331fd8fe..bf3d19c6fc 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -253,35 +253,39 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_exec_ctx_finish(&exec_ctx); } -static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - GPR_ASSERT(!success); - ++*(int *)arg; +static void inc_on_failure(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + *(int *)arg += (success == false); } static void multiple_shutdown_test(grpc_endpoint_test_config config) { - grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", 128); + grpc_endpoint_test_fixture f = + begin_test(config, "multiple_shutdown_test", 128); int fail_count = 0; - gpr_slice_buffer incoming; - gpr_slice_buffer_init(&incoming); + gpr_slice_buffer slice_buffer; + gpr_slice_buffer_init(&slice_buffer); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, - grpc_closure_create(must_fail, &fail_count)); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(fail_count == 0); grpc_endpoint_shutdown(&exec_ctx, f.client_ep); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(fail_count == 1); - grpc_endpoint_read(&exec_ctx, f.client_ep, &incoming, - grpc_closure_create(must_fail, &fail_count)); + grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); grpc_exec_ctx_flush(&exec_ctx); GPR_ASSERT(fail_count == 2); + grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer, + grpc_closure_create(inc_on_failure, &fail_count)); + grpc_exec_ctx_flush(&exec_ctx); + GPR_ASSERT(fail_count == 3); grpc_endpoint_shutdown(&exec_ctx, f.client_ep); grpc_exec_ctx_flush(&exec_ctx); - GPR_ASSERT(fail_count == 2); + GPR_ASSERT(fail_count == 3); - gpr_slice_buffer_destroy(&incoming); + gpr_slice_buffer_destroy(&slice_buffer); grpc_endpoint_destroy(&exec_ctx, f.client_ep); grpc_endpoint_destroy(&exec_ctx, f.server_ep); @@ -293,12 +297,12 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config, size_t i; g_pollset = pollset; g_mu = mu; + multiple_shutdown_test(config); read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { read_and_write_test(config, 40320, i, i, 0); } - multiple_shutdown_test(config); g_pollset = NULL; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 19ba258303..1112e8a3aa 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -517,8 +517,8 @@ int main(int argc, char **argv) { grpc_init(); g_pollset = gpr_malloc(grpc_pollset_size()); grpc_pollset_init(g_pollset, &g_mu); - run_tests(); grpc_endpoint_tests(configs[0], g_pollset, g_mu); + run_tests(); grpc_closure_init(&destroyed, destroy_pollset, g_pollset); grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); -- cgit v1.2.3 From 2e12db9c319bcbdbb2fa570149f88e4b496b558c Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Thu, 16 Jun 2016 16:53:59 -0700 Subject: Test polling island merges --- Makefile | 36 +++++ build.yaml | 12 ++ src/core/lib/iomgr/ev_epoll_linux.c | 51 ++++++- src/core/lib/iomgr/ev_epoll_linux.h | 6 + src/core/lib/iomgr/ev_posix.c | 7 + src/core/lib/iomgr/ev_posix.h | 3 + test/core/iomgr/ev_epoll_linux_test.c | 222 +++++++++++++++++++++++++++++++ tools/run_tests/sources_and_headers.json | 16 +++ tools/run_tests/tests.json | 15 +++ 9 files changed, 366 insertions(+), 2 deletions(-) create mode 100644 test/core/iomgr/ev_epoll_linux_test.c (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/Makefile b/Makefile index e615704395..825684cc2d 100644 --- a/Makefile +++ b/Makefile @@ -905,6 +905,7 @@ dns_resolver_connectivity_test: $(BINDIR)/$(CONFIG)/dns_resolver_connectivity_te dns_resolver_test: $(BINDIR)/$(CONFIG)/dns_resolver_test dualstack_socket_test: $(BINDIR)/$(CONFIG)/dualstack_socket_test endpoint_pair_test: $(BINDIR)/$(CONFIG)/endpoint_pair_test +ev_epoll_linux_test: $(BINDIR)/$(CONFIG)/ev_epoll_linux_test fd_conservation_posix_test: $(BINDIR)/$(CONFIG)/fd_conservation_posix_test fd_posix_test: $(BINDIR)/$(CONFIG)/fd_posix_test fling_client: $(BINDIR)/$(CONFIG)/fling_client @@ -1242,6 +1243,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/dns_resolver_test \ $(BINDIR)/$(CONFIG)/dualstack_socket_test \ $(BINDIR)/$(CONFIG)/endpoint_pair_test \ + $(BINDIR)/$(CONFIG)/ev_epoll_linux_test \ $(BINDIR)/$(CONFIG)/fd_conservation_posix_test \ $(BINDIR)/$(CONFIG)/fd_posix_test \ $(BINDIR)/$(CONFIG)/fling_client \ @@ -1512,6 +1514,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/dualstack_socket_test || ( echo test dualstack_socket_test failed ; exit 1 ) $(E) "[RUN] Testing endpoint_pair_test" $(Q) $(BINDIR)/$(CONFIG)/endpoint_pair_test || ( echo test endpoint_pair_test failed ; exit 1 ) + $(E) "[RUN] Testing ev_epoll_linux_test" + $(Q) $(BINDIR)/$(CONFIG)/ev_epoll_linux_test || ( echo test ev_epoll_linux_test failed ; exit 1 ) $(E) "[RUN] Testing fd_conservation_posix_test" $(Q) $(BINDIR)/$(CONFIG)/fd_conservation_posix_test || ( echo test fd_conservation_posix_test failed ; exit 1 ) $(E) "[RUN] Testing fd_posix_test" @@ -7130,6 +7134,38 @@ endif endif +EV_EPOLL_LINUX_TEST_SRC = \ + test/core/iomgr/ev_epoll_linux_test.c \ + +EV_EPOLL_LINUX_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(EV_EPOLL_LINUX_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/ev_epoll_linux_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/ev_epoll_linux_test: $(EV_EPOLL_LINUX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(EV_EPOLL_LINUX_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/ev_epoll_linux_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/ev_epoll_linux_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_ev_epoll_linux_test: $(EV_EPOLL_LINUX_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(EV_EPOLL_LINUX_TEST_OBJS:.o=.dep) +endif +endif + + FD_CONSERVATION_POSIX_TEST_SRC = \ test/core/iomgr/fd_conservation_posix_test.c \ diff --git a/build.yaml b/build.yaml index 7790e0c517..84f4ea521b 100644 --- a/build.yaml +++ b/build.yaml @@ -1407,6 +1407,18 @@ targets: - grpc - gpr_test_util - gpr +- name: ev_epoll_linux_test + build: test + language: c + src: + - test/core/iomgr/ev_epoll_linux_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + platforms: + - linux - name: fd_conservation_posix_test build: test language: c diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 1fb5947464..ed2c494b78 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -317,8 +317,9 @@ static void polling_island_remove_all_fds_locked(polling_island *pi, if (err < 0 && errno != ENOENT) { /* TODO: sreek - We need a better way to bubble up this error instead of * just logging a message */ - gpr_log(GPR_ERROR, "epoll_ctl deleting fds[%zu]: %d failed with error: %s", - i, pi->fds[i]->fd, strerror(errno)); + gpr_log(GPR_ERROR, + "epoll_ctl deleting fds[%zu]: %d failed with error: %s", i, + pi->fds[i]->fd, strerror(errno)); } if (remove_fd_refs) { @@ -1458,6 +1459,52 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&bag->mu); } +/* Test helper functions + * */ +void *grpc_fd_get_polling_island(grpc_fd *fd) { + polling_island *pi; + + gpr_mu_lock(&fd->pi_mu); + pi = fd->polling_island; + gpr_mu_unlock(&fd->pi_mu); + + return pi; +} + +void *grpc_pollset_get_polling_island(grpc_pollset *ps) { + polling_island *pi; + + gpr_mu_lock(&ps->pi_mu); + pi = ps->polling_island; + gpr_mu_unlock(&ps->pi_mu); + + return pi; +} + +static polling_island *get_polling_island(polling_island *p) { + if (p == NULL) { + return NULL; + } + + polling_island *next; + gpr_mu_lock(&p->mu); + while (p->merged_to != NULL) { + next = p->merged_to; + gpr_mu_unlock(&p->mu); + p = next; + gpr_mu_lock(&p->mu); + } + gpr_mu_unlock(&p->mu); + + return p; +} + +bool grpc_are_polling_islands_equal(void *p, void *q) { + p = get_polling_island(p); + q = get_polling_island(q); + return p == q; +} + /******************************************************************************* * Event engine binding */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.h b/src/core/lib/iomgr/ev_epoll_linux.h index 8c819975a4..7a494aba19 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.h +++ b/src/core/lib/iomgr/ev_epoll_linux.h @@ -38,4 +38,10 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void); +#ifdef GPR_LINUX_EPOLL +void *grpc_fd_get_polling_island(grpc_fd *fd); +void *grpc_pollset_get_polling_island(grpc_pollset *ps); +bool grpc_are_polling_islands_equal(void *p, void *q); +#endif /* defined(GPR_LINUX_EPOLL) */ + #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 2b15967adc..5b20600a6f 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -54,6 +54,7 @@ grpc_poll_function_type grpc_poll_function = poll; static const grpc_event_engine_vtable *g_event_engine; +static const char* g_poll_strategy_name = NULL; typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void); @@ -101,6 +102,7 @@ static void try_engine(const char *engine) { for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) { if (is(engine, g_factories[i].name)) { if ((g_event_engine = g_factories[i].factory())) { + g_poll_strategy_name = g_factories[i].name; gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name); return; } @@ -108,6 +110,11 @@ static void try_engine(const char *engine) { } } +/* Call this only after calling grpc_event_engine_init() */ +const char *grpc_get_poll_strategy_name() { + return g_poll_strategy_name; +} + void grpc_event_engine_init(void) { char *s = gpr_getenv("GRPC_POLL_STRATEGY"); if (s == NULL) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 344bf63438..3ed5a5f956 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -98,6 +98,9 @@ typedef struct grpc_event_engine_vtable { void grpc_event_engine_init(void); void grpc_event_engine_shutdown(void); +/* Return the name of the poll strategy */ +const char* grpc_get_poll_strategy_name(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c new file mode 100644 index 0000000000..51da15faa7 --- /dev/null +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -0,0 +1,222 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/ev_epoll_linux.h" +#include "src/core/lib/iomgr/ev_posix.h" + +#include +#include +#include +#include + +#include +#include + +#include "src/core/lib/iomgr/iomgr.h" +#include "test/core/util/test_config.h" + +typedef struct test_pollset { + grpc_pollset *pollset; + gpr_mu *mu; +} test_pollset; + +typedef struct test_fd { + int inner_fd; + grpc_fd *fd; +} test_fd; + +static void test_fd_init(test_fd *fds, int num_fds) { + int i; + for (i = 0; i < num_fds; i++) { + fds[i].inner_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + fds[i].fd = grpc_fd_create(fds[i].inner_fd, "test_fd"); + } +} + +static void test_fd_cleanup(grpc_exec_ctx *exec_ctx, test_fd *fds, + int num_fds) { + int release_fd; + int i; + + for (i = 0; i < num_fds; i++) { + grpc_fd_shutdown(exec_ctx, fds[i].fd); + grpc_exec_ctx_flush(exec_ctx); + + grpc_fd_orphan(exec_ctx, fds[i].fd, NULL, &release_fd, "test_fd_cleanup"); + grpc_exec_ctx_flush(exec_ctx); + + GPR_ASSERT(release_fd == fds[i].inner_fd); + close(fds[i].inner_fd); + } +} + +static void test_pollset_init(test_pollset *pollsets, int num_pollsets) { + int i; + for (i = 0; i < num_pollsets; i++) { + pollsets[i].pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(pollsets[i].pollset, &pollsets[i].mu); + } +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { + grpc_pollset_destroy(p); +} + +static void test_pollset_cleanup(grpc_exec_ctx *exec_ctx, + test_pollset *pollsets, int num_pollsets) { + grpc_closure destroyed; + int i; + + for (i = 0; i < num_pollsets; i++) { + grpc_closure_init(&destroyed, destroy_pollset, pollsets[i].pollset); + grpc_pollset_shutdown(exec_ctx, pollsets[i].pollset, &destroyed); + + grpc_exec_ctx_flush(exec_ctx); + gpr_free(pollsets[i].pollset); + } +} + +#define NUM_FDS 8 +#define NUM_POLLSETS 4 +/* + * Cases to test: + * case 1) Polling islands of both fd and pollset are NULL + * case 2) Polling island of fd is NULL but that of pollset is not-NULL + * case 3) Polling island of fd is not-NULL but that of pollset is NULL + * case 4) Polling islands of both fd and pollset are not-NULL and: + * case 4.1) Polling islands of fd and pollset are equal + * case 4.2) Polling islands of fd and pollset are NOT-equal (This results + * in a merge) + * */ +static void test_add_fd_to_pollset() { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + test_fd fds[NUM_FDS]; + test_pollset pollsets[NUM_POLLSETS]; + void *expected_pi = NULL; + int i; + + test_fd_init(fds, NUM_FDS); + test_pollset_init(pollsets, NUM_POLLSETS); + + /*Step 1. + * Create three polling islands (This will exercise test case 1 and 2) with + * the following configuration: + * polling island 0 = { fds:0,1,2, pollsets:0} + * polling island 1 = { fds:3,4, pollsets:1} + * polling island 2 = { fds:5,6,7 pollsets:2} + * + *Step 2. + * Add pollset 3 to polling island 0 (by adding fds 0 and 1 to pollset 3) + * (This will exercise test cases 3 and 4.1). The configuration becomes: + * polling island 0 = { fds:0,1,2, pollsets:0,3} <<< pollset 3 added here + * polling island 1 = { fds:3,4, pollsets:1} + * polling island 2 = { fds:5,6,7 pollsets:2} + * + *Step 3. + * Merge polling islands 0 and 1 by adding fd 0 to pollset 1 (This will + * exercise test case 4.2). The configuration becomes: + * polling island (merged) = {fds: 0,1,2,3,4, pollsets: 0,1,3} + * polling island 2 = {fds: 5,6,7 pollsets: 2} + * + *Step 4. + * Finally do one more merge by adding fd 3 to pollset 2. + * polling island (merged) = {fds: 0,1,2,3,4,5,6,7, pollsets: 0,1,2,3} + */ + + /* == Step 1 == */ + for (i = 0; i <= 2; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[0].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + for (i = 3; i <= 4; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + for (i = 5; i <= 7; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + /* == Step 2 == */ + for (i = 0; i <= 1; i++) { + grpc_pollset_add_fd(&exec_ctx, pollsets[3].pollset, fds[i].fd); + grpc_exec_ctx_flush(&exec_ctx); + } + + /* == Step 3 == */ + grpc_pollset_add_fd(&exec_ctx, pollsets[1].pollset, fds[0].fd); + grpc_exec_ctx_flush(&exec_ctx); + + /* == Step 4 == */ + grpc_pollset_add_fd(&exec_ctx, pollsets[2].pollset, fds[3].fd); + grpc_exec_ctx_flush(&exec_ctx); + + /* All polling islands are merged at this point */ + + /* Compare Fd:0's polling island with that of all other Fds */ + expected_pi = grpc_fd_get_polling_island(fds[0].fd); + for (i = 1; i < NUM_FDS; i++) { + GPR_ASSERT(grpc_are_polling_islands_equal( + expected_pi, grpc_fd_get_polling_island(fds[i].fd))); + } + + /* Compare Fd:0's polling island with that of all other pollsets */ + for (i = 0; i < NUM_POLLSETS; i++) { + GPR_ASSERT(grpc_are_polling_islands_equal( + expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset))); + } + + test_fd_cleanup(&exec_ctx, fds, NUM_FDS); + test_pollset_cleanup(&exec_ctx, pollsets, NUM_POLLSETS); + grpc_exec_ctx_finish(&exec_ctx); +} + +int main(int argc, char **argv) { + const char *poll_strategy = NULL; + grpc_test_init(argc, argv); + grpc_iomgr_init(); + + poll_strategy = grpc_get_poll_strategy_name(); + if (poll_strategy != NULL && strcmp(poll_strategy, "epoll") == 0) { + test_add_fd_to_pollset(); + } else { + gpr_log(GPR_INFO, + "Skipping the test. The test is only relevant for 'epoll' " + "strategy. and the current strategy is: '%s'", + poll_strategy); + } + grpc_iomgr_shutdown(); + return 0; +} diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index e8ff61dc3f..e9df72e43a 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -315,6 +315,22 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "language": "c", + "name": "ev_epoll_linux_test", + "src": [ + "test/core/iomgr/ev_epoll_linux_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 5a84a41b63..ba661840da 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -377,6 +377,21 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "ev_epoll_linux_test", + "platforms": [ + "linux" + ] + }, { "args": [], "ci_platforms": [ -- cgit v1.2.3 From 0224dcc2dcda932a171776de325fa2e66c95478f Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Wed, 22 Jun 2016 18:04:00 -0700 Subject: clang format --- src/core/lib/iomgr/ev_epoll_linux.c | 12 ++++++------ src/core/lib/iomgr/ev_posix.h | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'src/core/lib/iomgr/ev_posix.h') diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 88cbc58634..b1e9ac8a63 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -323,7 +323,7 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, #ifdef GRPC_TSAN /* See the definition of g_epoll_sync for more context */ - gpr_atm_rel_store(&g_epoll_sync, (gpr_atm) 0); + gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0); #endif /* defined(GRPC_TSAN) */ for (i = 0; i < fd_count; i++) { @@ -443,8 +443,8 @@ static polling_island *polling_island_create(grpc_fd *initial_fd) { pi->fds = NULL; } - gpr_atm_rel_store(&pi->ref_count, (gpr_atm) 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm) NULL); + gpr_atm_rel_store(&pi->ref_count, (gpr_atm)0); + gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); @@ -473,7 +473,7 @@ static polling_island *polling_island_create(grpc_fd *initial_fd) { static void polling_island_delete(polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm) NULL); + gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); close(pi->epoll_fd); pi->epoll_fd = -1; @@ -649,7 +649,7 @@ static polling_island *polling_island_merge(polling_island *p, polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd); /* Add the 'merged_to' link from p --> q */ - gpr_atm_rel_store(&p->merged_to, (gpr_atm) q); + gpr_atm_rel_store(&p->merged_to, (gpr_atm)q); PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */ gpr_mu_unlock(&p->mu); @@ -811,7 +811,7 @@ static grpc_fd *fd_create(int fd, const char *name) { holding a lock to it anyway. */ gpr_mu_lock(&new_fd->mu); - gpr_atm_rel_store(&new_fd->refst, (gpr_atm) 1); + gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; new_fd->shutdown = false; new_fd->orphaned = false; diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 32260fe2ee..579c84ef70 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -100,7 +100,7 @@ void grpc_event_engine_init(void); void grpc_event_engine_shutdown(void); /* Return the name of the poll strategy */ -const char* grpc_get_poll_strategy_name(); +const char *grpc_get_poll_strategy_name(); /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. -- cgit v1.2.3