diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-04-19 19:37:39 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-04-21 09:53:42 -0700 |
commit | 1898a29f7ecf2ac4f706351d68dedd5767f02ec9 (patch) | |
tree | 31eb17dbb3fab45f493ab3791bf1d416ebe42199 | |
parent | bcdf9e8d899bf5018978c4f3635c51df3f0c7d09 (diff) |
Fixed pollers
-rw-r--r-- | include/grpc++/server_builder.h | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 362 | ||||
-rw-r--r-- | test/core/iomgr/ev_epoll_linux_test.c | 46 |
3 files changed, 320 insertions, 90 deletions
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h index 7ac23349c8..68c5344e2a 100644 --- a/include/grpc++/server_builder.h +++ b/include/grpc++/server_builder.h @@ -195,7 +195,7 @@ class ServerBuilder { struct SyncServerSettings { SyncServerSettings() - : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} + : num_cqs(gpr_cpu_num_cores()), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {} // Number of server completion queues to create to listen to incoming RPCs. int num_cqs; diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index e603a75593..16a9199ab9 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -97,6 +97,9 @@ void grpc_use_signal(int signum) { } } +/* The maximum number of polling threads per polling island */ +#define GRPC_MAX_POLLERS_PER_ISLAND 1 + struct polling_island; typedef enum { @@ -195,6 +198,11 @@ static void fd_global_shutdown(void); #endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */ +typedef struct worker_node { + struct worker_node *next; + struct worker_node *prev; +} worker_node; + /* This is also used as grpc_workqueue (by directly casing it) */ typedef struct polling_island { grpc_closure_scheduler workqueue_scheduler; @@ -229,6 +237,9 @@ typedef struct polling_island { /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ grpc_wakeup_fd workqueue_wakeup_fd; + gpr_mu worker_list_mu; + worker_node worker_list_head; + /* The fd of the underlying epoll set */ int epoll_fd; @@ -241,14 +252,21 @@ typedef struct polling_island { /******************************************************************************* * Pollset Declarations */ +#define WORKER_FROM_WORKER_LIST_NODE(p) \ + (struct grpc_pollset_worker *)(((char *)(p)) - \ + offsetof(grpc_pollset_worker, pi_list_link)) struct grpc_pollset_worker { /* Thread id of this worker */ pthread_t pt_id; /* Used to prevent a worker from getting kicked multiple times */ gpr_atm is_kicked; + struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; + + gpr_atm is_polling_turn; + worker_node pi_list_link; }; struct grpc_pollset { @@ -392,7 +410,47 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { } } -/* The caller is expected to hold pi->mu lock before calling this function */ +static void worker_node_init(worker_node *node) { + node->next = node->prev = node; +} + +/* Not thread safe. Do under a list-level lock */ +static void push_back_worker_node(worker_node *head, worker_node *node) { + node->next = head; + node->prev = head->prev; + head->prev->next = node; + head->prev = node; +} + +/* Not thread safe. Do under a list-level lock */ +static void remove_worker_node(worker_node *node) { + node->next->prev = node->prev; + node->prev->next = node->next; + /* If node's next and prev point to itself, the node is considered detached + * from the list*/ + node->next = node->prev = node; +} + +/* Not thread safe. Do under a list-level lock */ +static worker_node *pop_front_worker_node(worker_node *head) { + worker_node *node = head->next; + if (node != head) { + remove_worker_node(node); + } else { + node = NULL; + } + + return node; +} + +/* Returns true if the node's next and prev are pointing to itself (which + indicates that the node is not in the list */ +static bool is_worker_node_detached(worker_node *node) { + return (node->next == node->prev && node->next == node); +} + +/* The caller is expected to hold pi->mu lock before calling this function + */ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, size_t fd_count, bool add_fd_refs, grpc_error **error) { @@ -546,6 +604,9 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, gpr_atm_rel_store(&pi->poller_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); + gpr_mu_init(&pi->worker_list_mu); + worker_node_init(&pi->worker_list_head); + if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd), err_desc)) { goto done; @@ -584,6 +645,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { gpr_mpscq_destroy(&pi->workqueue_items); gpr_mu_destroy(&pi->mu); grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd); + gpr_mu_destroy(&pi->worker_list_mu); + GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head)); + gpr_free(pi->fds); gpr_free(pi); } @@ -1102,6 +1166,7 @@ GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); static __thread bool g_initialized_sigmask; static __thread sigset_t g_orig_sigmask; +static __thread sigset_t g_wakeup_sig_set; static void sig_handler(int sig_num) { #ifdef GRPC_EPOLL_DEBUG @@ -1109,6 +1174,14 @@ static void sig_handler(int sig_num) { #endif } +static void pollset_worker_init(grpc_pollset_worker *worker) { + worker->pt_id = pthread_self(); + worker->next = worker->prev = NULL; + gpr_atm_no_barrier_store(&worker->is_kicked, (gpr_atm)0); + gpr_atm_no_barrier_store(&worker->is_polling_turn, (gpr_atm)0); + worker_node_init(&worker->pi_list_link); +} + static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); } /* Global state management */ @@ -1125,11 +1198,12 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } -static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { +static grpc_error *worker_kick(grpc_pollset_worker *worker, + gpr_atm *is_kicked) { grpc_error *err = GRPC_ERROR_NONE; /* Kick the worker only if it was not already kicked */ - if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) { + if (gpr_atm_no_barrier_cas(is_kicked, (gpr_atm)0, (gpr_atm)1)) { GRPC_POLLING_TRACE( "pollset_worker_kick: Kicking worker: %p (thread id: %ld)", (void *)worker, (long int)worker->pt_id); @@ -1141,6 +1215,14 @@ static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { return err; } +static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { + return worker_kick(worker, &worker->is_kicked); +} + +static grpc_error *poller_kick(grpc_pollset_worker *worker) { + return worker_kick(worker, &worker->is_polling_turn); +} + /* Return 1 if the pollset has active threads in pollset_work (pollset must * be locked) */ static int pollset_has_workers(grpc_pollset *p) { @@ -1246,6 +1328,22 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutdown_done = NULL; } +/* Convert millis to timespec (clock-type is assumed to be GPR_TIMESPAN) */ +static struct timespec millis_to_timespec(int millis) { + struct timespec linux_ts; + gpr_timespec gpr_ts; + + if (millis == -1) { + gpr_ts = gpr_inf_future(GPR_TIMESPAN); + } else { + gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN); + } + + linux_ts.tv_sec = (time_t)gpr_ts.tv_sec; + linux_ts.tv_nsec = gpr_ts.tv_nsec; + return linux_ts; +} + /* Convert a timespec to milliseconds: - Very small or negative poll times are clamped to zero to do a non-blocking poll (which becomes spin polling) @@ -1364,35 +1462,190 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, return false; } +/* NOTE: May modify 'now' */ +static bool acquire_polling_lease(grpc_pollset_worker *worker, + polling_island *pi, gpr_timespec deadline, + gpr_timespec *now) { + bool is_lease_acquired = false; + + gpr_mu_lock(&pi->worker_list_mu); // Lock + long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); + + if (num_pollers >= GRPC_MAX_POLLERS_PER_ISLAND) { + push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link); + gpr_mu_unlock(&pi->worker_list_mu); // Unlock + + bool is_timeout = false; + int ret; + int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now); + if (timeout_ms == -1) { + ret = sigwaitinfo(&g_wakeup_sig_set, NULL); + } else { + struct timespec sigwait_timeout = millis_to_timespec(timeout_ms); + ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout); + } + + if (ret == -1) { + if (errno == EAGAIN) { + // gpr_log(GPR_INFO, "timeout"); // TODO: sreek remove this + // log-line + } else { + gpr_log(GPR_ERROR, "Failed with retcode: %d (timeout_ms: %d)", errno, + timeout_ms); + } + is_timeout = true; + } + + bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn); + /* + if (is_polling_turn) { +/ gpr_log(GPR_ERROR, "do epoll is true (timeout_ms:%d)", + timeout_ms); // TODO: sreek remove this logline + } + */ + + bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked); + if (is_kicked || is_timeout) { + *now = deadline; + } else if (is_polling_turn) { + *now = gpr_now(GPR_CLOCK_MONOTONIC); + } + + gpr_mu_lock(&pi->worker_list_mu); // Lock + /* The node might have already been removed from the list by the poller + that kicked this. However it is safe to call 'remove_worker_node' on + an already detached node */ + remove_worker_node(&worker->pi_list_link); + num_pollers = gpr_atm_no_barrier_load(&pi->poller_count); + } + + if (num_pollers < GRPC_MAX_POLLERS_PER_ISLAND) { + gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); // Add a poller + is_lease_acquired = true; + } + + gpr_mu_unlock(&pi->worker_list_mu); // Unlock + return is_lease_acquired; +} + +static void release_polling_lease(polling_island *pi, grpc_error **error) { + gpr_mu_lock(&pi->worker_list_mu); // Lock + + gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); // Remove poller + worker_node *node = pop_front_worker_node(&pi->worker_list_head); + if (node != NULL) { + grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node); + append_error(error, poller_kick(next_worker), "poller kick error"); + } + + gpr_mu_unlock(&pi->worker_list_mu); +} + #define GRPC_EPOLL_MAX_EVENTS 100 +static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd, + grpc_pollset *pollset, polling_island *pi, + grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline, + sigset_t *sig_mask, grpc_error **error) { + if (!acquire_polling_lease(worker, pi, deadline, &now)) { + return; + } + + struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; + int ep_rv; + char *err_msg; + const char *err_desc = "pollset_work_and_unlock"; + + int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); + + GRPC_SCHEDULING_START_BLOCKING_REGION; + ep_rv = + epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask); + GRPC_SCHEDULING_END_BLOCKING_REGION; + + release_polling_lease(pi, error); + + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_asprintf(&err_msg, + "epoll_wait() epoll fd: %d failed with error: %d (%s)", + epoll_fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + } else { + /* We were interrupted. Save an interation by doing a zero timeout + epoll_wait to see if there are any other events of interest */ + GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick", + (void *)pollset, (void *)worker); + ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + } + } + +#ifdef GRPC_TSAN + /* See the definition of g_poll_sync for more details */ + gpr_atm_acq_load(&g_epoll_sync); +#endif /* defined(GRPC_TSAN) */ + + for (int i = 0; i < ep_rv; ++i) { + void *data_ptr = ep_ev[i].data.ptr; + if (data_ptr == &global_wakeup_fd) { + grpc_timer_consume_kick(); + append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), + err_desc); + } else if (data_ptr == &pi->workqueue_wakeup_fd) { + append_error(error, + grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), + err_desc); + maybe_do_workqueue_work(exec_ctx, pi); + } else if (data_ptr == &polling_island_wakeup_fd) { + GRPC_POLLING_TRACE( + "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " + "%d) got merged", + (void *)pollset, (void *)worker, epoll_fd); + /* This means that our polling island is merged with a different + island. We do not have to do anything here since the subsequent call + to the function pollset_work_and_unlock() will pick up the correct + epoll_fd */ + } else { + grpc_fd *fd = data_ptr; + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write_ev = ep_ev[i].events & EPOLLOUT; + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); + } + } + } +} + /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, int timeout_ms, + grpc_pollset_worker *worker, + gpr_timespec now, gpr_timespec deadline, sigset_t *sig_mask, grpc_error **error) { - struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int epoll_fd = -1; - int ep_rv; polling_island *pi = NULL; - char *err_msg; - const char *err_desc = "pollset_work_and_unlock"; GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the latest polling island pointed by pollset->po.pi - Since epoll_fd is immutable, we can read it without obtaining the polling - island lock. There is however a possibility that the polling island (from - which we got the epoll_fd) got merged with another island while we are - in this function. This is still okay because in such a case, we will wakeup - right-away from epoll_wait() and pick up the latest polling_island the next - this function (i.e pollset_work_and_unlock()) is called */ + Since epoll_fd is immutable, it is safe to read it without a lock on the + polling island. There is however a possibility that the polling island from + which we got the epoll_fd, got merged with another island in the meantime. + This is okay because in such a case, we will wakeup right-away from + epoll_pwait() (because any merge will poison the old polling island's epoll + set 'polling_island_wakeup_fd') and then pick up the latest polling_island + the next time this function - pollset_work_and_unlock()) is called */ if (pollset->po.pi == NULL) { pollset->po.pi = polling_island_create(exec_ctx, NULL, error); if (pollset->po.pi == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); - return; /* Fatal error. We cannot continue */ + return; /* Fatal error. Cannot continue */ } PI_ADD_REF(pollset->po.pi, "ps"); @@ -1423,70 +1676,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, the completion queue, so there's no need to poll... so we skip that and redo the complete loop to verify */ if (!maybe_do_workqueue_work(exec_ctx, pi)) { - gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); g_current_thread_polling_island = pi; - - GRPC_SCHEDULING_START_BLOCKING_REGION; - ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, - sig_mask); - GRPC_SCHEDULING_END_BLOCKING_REGION; - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_asprintf(&err_msg, - "epoll_wait() epoll fd: %d failed with error: %d (%s)", - epoll_fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - } else { - /* We were interrupted. Save an interation by doing a zero timeout - epoll_wait to see if there are any other events of interest */ - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p received kick", - (void *)pollset, (void *)worker); - ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); - } - } - -#ifdef GRPC_TSAN - /* See the definition of g_poll_sync for more details */ - gpr_atm_acq_load(&g_epoll_sync); -#endif /* defined(GRPC_TSAN) */ - - for (int i = 0; i < ep_rv; ++i) { - void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &global_wakeup_fd) { - grpc_timer_consume_kick(); - append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), - err_desc); - } else if (data_ptr == &pi->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, pi); - } else if (data_ptr == &polling_island_wakeup_fd) { - GRPC_POLLING_TRACE( - "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: " - "%d) got merged", - (void *)pollset, (void *)worker, epoll_fd); - /* This means that our polling island is merged with a different - island. We do not have to do anything here since the subsequent call - to the function pollset_work_and_unlock() will pick up the correct - epoll_fd */ - } else { - grpc_fd *fd = data_ptr; - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } - } - } - + pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, + deadline, sig_mask, error); g_current_thread_polling_island = NULL; - gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); } GPR_ASSERT(pi != NULL); @@ -1510,14 +1703,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_timespec now, gpr_timespec deadline) { GPR_TIMER_BEGIN("pollset_work", 0); grpc_error *error = GRPC_ERROR_NONE; - int timeout_ms = poll_deadline_to_millis_timeout(deadline, now); - - sigset_t new_mask; grpc_pollset_worker worker; - worker.next = worker.prev = NULL; - worker.pt_id = pthread_self(); - gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); + pollset_worker_init(&worker); if (worker_hdl) *worker_hdl = &worker; @@ -1551,9 +1739,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, misses acting on a kick */ if (!g_initialized_sigmask) { - sigemptyset(&new_mask); - sigaddset(&new_mask, grpc_wakeup_signal); - pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask); + sigemptyset(&g_wakeup_sig_set); + sigaddset(&g_wakeup_sig_set, grpc_wakeup_signal); + pthread_sigmask(SIG_BLOCK, &g_wakeup_sig_set, &g_orig_sigmask); sigdelset(&g_orig_sigmask, grpc_wakeup_signal); g_initialized_sigmask = true; /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'. @@ -1568,7 +1756,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, push_front_worker(pollset, &worker); /* Add worker to pollset */ - pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms, + pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline, &g_orig_sigmask, &error); grpc_exec_ctx_flush(exec_ctx); diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c index 0856023b14..f7745cddb4 100644 --- a/test/core/iomgr/ev_epoll_linux_test.c +++ b/test/core/iomgr/ev_epoll_linux_test.c @@ -38,7 +38,10 @@ #include "src/core/lib/iomgr/ev_posix.h" #include <errno.h> +#include <signal.h> +#include <stdio.h> #include <string.h> +#include <sys/time.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -327,7 +330,7 @@ static __thread int thread_wakeups = 0; static void test_threading_loop(void *arg) { threading_shared *shared = arg; - while (thread_wakeups < 1000000) { + while (thread_wakeups < 20000) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker *worker; gpr_mu_lock(shared->mu); @@ -360,7 +363,7 @@ static void test_threading(void) { shared.pollset = gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(shared.pollset, &shared.mu); - gpr_thd_id thds[10]; + gpr_thd_id thds[20]; for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); @@ -399,6 +402,44 @@ static void test_threading(void) { gpr_free(shared.pollset); } +/* Convert milliseconds into 'struct timespec' struct. millis == -1 is + * * considered as an infinity-time in future */ +static struct timespec millis_to_timespec(int millis) { + struct timespec linux_ts; + gpr_timespec gpr_ts; + + if (millis == -1) { + gpr_ts = gpr_inf_future(GPR_TIMESPAN); + } else { + gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN); + } + + linux_ts.tv_sec = (time_t)gpr_ts.tv_sec; + linux_ts.tv_nsec = gpr_ts.tv_nsec; + return linux_ts; +} + +void test_sigwait() { + sigset_t wakeup_sig_set; + sigemptyset(&wakeup_sig_set); + sigaddset(&wakeup_sig_set, SIGRTMIN + 6); + int timeout_ms[] = {10, 1400}; + + for (size_t i = 0; i < GPR_ARRAY_SIZE(timeout_ms); i++) { + struct timespec sigwait_timeout = millis_to_timespec(timeout_ms[i]); + gpr_log(GPR_ERROR, "sigwait_timeout: %ld, %ld", sigwait_timeout.tv_sec, + sigwait_timeout.tv_nsec); + + gpr_log(GPR_ERROR, "Waiting for %d ms...", timeout_ms[i]); + gpr_timespec bef = gpr_now(GPR_CLOCK_REALTIME); + sigtimedwait(&wakeup_sig_set, NULL, &sigwait_timeout); + gpr_timespec af = gpr_now(GPR_CLOCK_REALTIME); + + gpr_log(GPR_ERROR, "Bef: %ld, %d", bef.tv_sec, bef.tv_nsec); + gpr_log(GPR_ERROR, "Aft: %ld, %d", af.tv_sec, af.tv_nsec); + } +} + int main(int argc, char **argv) { const char *poll_strategy = NULL; grpc_test_init(argc, argv); @@ -409,6 +450,7 @@ int main(int argc, char **argv) { test_add_fd_to_pollset(); test_pollset_queue_merge_items(); test_threading(); + test_sigwait(); } else { gpr_log(GPR_INFO, "Skipping the test. The test is only relevant for 'epoll' " |