aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-04-19 19:37:39 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-04-21 09:53:42 -0700
commit1898a29f7ecf2ac4f706351d68dedd5767f02ec9 (patch)
tree31eb17dbb3fab45f493ab3791bf1d416ebe42199
parentbcdf9e8d899bf5018978c4f3635c51df3f0c7d09 (diff)
Fixed pollers
-rw-r--r--include/grpc++/server_builder.h2
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c362
-rw-r--r--test/core/iomgr/ev_epoll_linux_test.c46
3 files changed, 320 insertions, 90 deletions
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index 7ac23349c8..68c5344e2a 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -195,7 +195,7 @@ class ServerBuilder {
struct SyncServerSettings {
SyncServerSettings()
- : num_cqs(1), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
+ : num_cqs(gpr_cpu_num_cores()), min_pollers(1), max_pollers(2), cq_timeout_msec(10000) {}
// Number of server completion queues to create to listen to incoming RPCs.
int num_cqs;
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index e603a75593..16a9199ab9 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -97,6 +97,9 @@ void grpc_use_signal(int signum) {
}
}
+/* The maximum number of polling threads per polling island */
+#define GRPC_MAX_POLLERS_PER_ISLAND 1
+
struct polling_island;
typedef enum {
@@ -195,6 +198,11 @@ static void fd_global_shutdown(void);
#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
+typedef struct worker_node {
+ struct worker_node *next;
+ struct worker_node *prev;
+} worker_node;
+
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
grpc_closure_scheduler workqueue_scheduler;
@@ -229,6 +237,9 @@ typedef struct polling_island {
/* Wakeup fd used to wake pollers to check the contents of workqueue_items */
grpc_wakeup_fd workqueue_wakeup_fd;
+ gpr_mu worker_list_mu;
+ worker_node worker_list_head;
+
/* The fd of the underlying epoll set */
int epoll_fd;
@@ -241,14 +252,21 @@ typedef struct polling_island {
/*******************************************************************************
* Pollset Declarations
*/
+#define WORKER_FROM_WORKER_LIST_NODE(p) \
+ (struct grpc_pollset_worker *)(((char *)(p)) - \
+ offsetof(grpc_pollset_worker, pi_list_link))
struct grpc_pollset_worker {
/* Thread id of this worker */
pthread_t pt_id;
/* Used to prevent a worker from getting kicked multiple times */
gpr_atm is_kicked;
+
struct grpc_pollset_worker *next;
struct grpc_pollset_worker *prev;
+
+ gpr_atm is_polling_turn;
+ worker_node pi_list_link;
};
struct grpc_pollset {
@@ -392,7 +410,47 @@ static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
}
}
-/* The caller is expected to hold pi->mu lock before calling this function */
+static void worker_node_init(worker_node *node) {
+ node->next = node->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static void push_back_worker_node(worker_node *head, worker_node *node) {
+ node->next = head;
+ node->prev = head->prev;
+ head->prev->next = node;
+ head->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static void remove_worker_node(worker_node *node) {
+ node->next->prev = node->prev;
+ node->prev->next = node->next;
+ /* If node's next and prev point to itself, the node is considered detached
+ * from the list*/
+ node->next = node->prev = node;
+}
+
+/* Not thread safe. Do under a list-level lock */
+static worker_node *pop_front_worker_node(worker_node *head) {
+ worker_node *node = head->next;
+ if (node != head) {
+ remove_worker_node(node);
+ } else {
+ node = NULL;
+ }
+
+ return node;
+}
+
+/* Returns true if the node's next and prev are pointing to itself (which
+ indicates that the node is not in the list */
+static bool is_worker_node_detached(worker_node *node) {
+ return (node->next == node->prev && node->next == node);
+}
+
+/* The caller is expected to hold pi->mu lock before calling this function
+ */
static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
size_t fd_count, bool add_fd_refs,
grpc_error **error) {
@@ -546,6 +604,9 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
gpr_atm_rel_store(&pi->poller_count, 0);
gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
+ gpr_mu_init(&pi->worker_list_mu);
+ worker_node_init(&pi->worker_list_head);
+
if (!append_error(error, grpc_wakeup_fd_init(&pi->workqueue_wakeup_fd),
err_desc)) {
goto done;
@@ -584,6 +645,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
gpr_mpscq_destroy(&pi->workqueue_items);
gpr_mu_destroy(&pi->mu);
grpc_wakeup_fd_destroy(&pi->workqueue_wakeup_fd);
+ gpr_mu_destroy(&pi->worker_list_mu);
+ GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
+
gpr_free(pi->fds);
gpr_free(pi);
}
@@ -1102,6 +1166,7 @@ GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
static __thread bool g_initialized_sigmask;
static __thread sigset_t g_orig_sigmask;
+static __thread sigset_t g_wakeup_sig_set;
static void sig_handler(int sig_num) {
#ifdef GRPC_EPOLL_DEBUG
@@ -1109,6 +1174,14 @@ static void sig_handler(int sig_num) {
#endif
}
+static void pollset_worker_init(grpc_pollset_worker *worker) {
+ worker->pt_id = pthread_self();
+ worker->next = worker->prev = NULL;
+ gpr_atm_no_barrier_store(&worker->is_kicked, (gpr_atm)0);
+ gpr_atm_no_barrier_store(&worker->is_polling_turn, (gpr_atm)0);
+ worker_node_init(&worker->pi_list_link);
+}
+
static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
/* Global state management */
@@ -1125,11 +1198,12 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_worker);
}
-static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
+static grpc_error *worker_kick(grpc_pollset_worker *worker,
+ gpr_atm *is_kicked) {
grpc_error *err = GRPC_ERROR_NONE;
/* Kick the worker only if it was not already kicked */
- if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) {
+ if (gpr_atm_no_barrier_cas(is_kicked, (gpr_atm)0, (gpr_atm)1)) {
GRPC_POLLING_TRACE(
"pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
(void *)worker, (long int)worker->pt_id);
@@ -1141,6 +1215,14 @@ static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
return err;
}
+static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
+ return worker_kick(worker, &worker->is_kicked);
+}
+
+static grpc_error *poller_kick(grpc_pollset_worker *worker) {
+ return worker_kick(worker, &worker->is_polling_turn);
+}
+
/* Return 1 if the pollset has active threads in pollset_work (pollset must
* be locked) */
static int pollset_has_workers(grpc_pollset *p) {
@@ -1246,6 +1328,22 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutdown_done = NULL;
}
+/* Convert millis to timespec (clock-type is assumed to be GPR_TIMESPAN) */
+static struct timespec millis_to_timespec(int millis) {
+ struct timespec linux_ts;
+ gpr_timespec gpr_ts;
+
+ if (millis == -1) {
+ gpr_ts = gpr_inf_future(GPR_TIMESPAN);
+ } else {
+ gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN);
+ }
+
+ linux_ts.tv_sec = (time_t)gpr_ts.tv_sec;
+ linux_ts.tv_nsec = gpr_ts.tv_nsec;
+ return linux_ts;
+}
+
/* Convert a timespec to milliseconds:
- Very small or negative poll times are clamped to zero to do a non-blocking
poll (which becomes spin polling)
@@ -1364,35 +1462,190 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
return false;
}
+/* NOTE: May modify 'now' */
+static bool acquire_polling_lease(grpc_pollset_worker *worker,
+ polling_island *pi, gpr_timespec deadline,
+ gpr_timespec *now) {
+ bool is_lease_acquired = false;
+
+ gpr_mu_lock(&pi->worker_list_mu); // Lock
+ long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+
+ if (num_pollers >= GRPC_MAX_POLLERS_PER_ISLAND) {
+ push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link);
+ gpr_mu_unlock(&pi->worker_list_mu); // Unlock
+
+ bool is_timeout = false;
+ int ret;
+ int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now);
+ if (timeout_ms == -1) {
+ ret = sigwaitinfo(&g_wakeup_sig_set, NULL);
+ } else {
+ struct timespec sigwait_timeout = millis_to_timespec(timeout_ms);
+ ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout);
+ }
+
+ if (ret == -1) {
+ if (errno == EAGAIN) {
+ // gpr_log(GPR_INFO, "timeout"); // TODO: sreek remove this
+ // log-line
+ } else {
+ gpr_log(GPR_ERROR, "Failed with retcode: %d (timeout_ms: %d)", errno,
+ timeout_ms);
+ }
+ is_timeout = true;
+ }
+
+ bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn);
+ /*
+ if (is_polling_turn) {
+/ gpr_log(GPR_ERROR, "do epoll is true (timeout_ms:%d)",
+ timeout_ms); // TODO: sreek remove this logline
+ }
+ */
+
+ bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked);
+ if (is_kicked || is_timeout) {
+ *now = deadline;
+ } else if (is_polling_turn) {
+ *now = gpr_now(GPR_CLOCK_MONOTONIC);
+ }
+
+ gpr_mu_lock(&pi->worker_list_mu); // Lock
+ /* The node might have already been removed from the list by the poller
+ that kicked this. However it is safe to call 'remove_worker_node' on
+ an already detached node */
+ remove_worker_node(&worker->pi_list_link);
+ num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
+ }
+
+ if (num_pollers < GRPC_MAX_POLLERS_PER_ISLAND) {
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1); // Add a poller
+ is_lease_acquired = true;
+ }
+
+ gpr_mu_unlock(&pi->worker_list_mu); // Unlock
+ return is_lease_acquired;
+}
+
+static void release_polling_lease(polling_island *pi, grpc_error **error) {
+ gpr_mu_lock(&pi->worker_list_mu); // Lock
+
+ gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1); // Remove poller
+ worker_node *node = pop_front_worker_node(&pi->worker_list_head);
+ if (node != NULL) {
+ grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node);
+ append_error(error, poller_kick(next_worker), "poller kick error");
+ }
+
+ gpr_mu_unlock(&pi->worker_list_mu);
+}
+
#define GRPC_EPOLL_MAX_EVENTS 100
+static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
+ grpc_pollset *pollset, polling_island *pi,
+ grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline,
+ sigset_t *sig_mask, grpc_error **error) {
+ if (!acquire_polling_lease(worker, pi, deadline, &now)) {
+ return;
+ }
+
+ struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
+ int ep_rv;
+ char *err_msg;
+ const char *err_desc = "pollset_work_and_unlock";
+
+ int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
+
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ ep_rv =
+ epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ release_polling_lease(pi, error);
+
+ if (ep_rv < 0) {
+ if (errno != EINTR) {
+ gpr_asprintf(&err_msg,
+ "epoll_wait() epoll fd: %d failed with error: %d (%s)",
+ epoll_fd, errno, strerror(errno));
+ append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
+ } else {
+ /* We were interrupted. Save an interation by doing a zero timeout
+ epoll_wait to see if there are any other events of interest */
+ GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
+ (void *)pollset, (void *)worker);
+ ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
+ }
+ }
+
+#ifdef GRPC_TSAN
+ /* See the definition of g_poll_sync for more details */
+ gpr_atm_acq_load(&g_epoll_sync);
+#endif /* defined(GRPC_TSAN) */
+
+ for (int i = 0; i < ep_rv; ++i) {
+ void *data_ptr = ep_ev[i].data.ptr;
+ if (data_ptr == &global_wakeup_fd) {
+ grpc_timer_consume_kick();
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+ err_desc);
+ } else if (data_ptr == &pi->workqueue_wakeup_fd) {
+ append_error(error,
+ grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
+ err_desc);
+ maybe_do_workqueue_work(exec_ctx, pi);
+ } else if (data_ptr == &polling_island_wakeup_fd) {
+ GRPC_POLLING_TRACE(
+ "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
+ "%d) got merged",
+ (void *)pollset, (void *)worker, epoll_fd);
+ /* This means that our polling island is merged with a different
+ island. We do not have to do anything here since the subsequent call
+ to the function pollset_work_and_unlock() will pick up the correct
+ epoll_fd */
+ } else {
+ grpc_fd *fd = data_ptr;
+ int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
+ int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
+ int write_ev = ep_ev[i].events & EPOLLOUT;
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
+ }
+ }
+ }
+}
+
/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset,
- grpc_pollset_worker *worker, int timeout_ms,
+ grpc_pollset_worker *worker,
+ gpr_timespec now, gpr_timespec deadline,
sigset_t *sig_mask, grpc_error **error) {
- struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
int epoll_fd = -1;
- int ep_rv;
polling_island *pi = NULL;
- char *err_msg;
- const char *err_desc = "pollset_work_and_unlock";
GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
/* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
latest polling island pointed by pollset->po.pi
- Since epoll_fd is immutable, we can read it without obtaining the polling
- island lock. There is however a possibility that the polling island (from
- which we got the epoll_fd) got merged with another island while we are
- in this function. This is still okay because in such a case, we will wakeup
- right-away from epoll_wait() and pick up the latest polling_island the next
- this function (i.e pollset_work_and_unlock()) is called */
+ Since epoll_fd is immutable, it is safe to read it without a lock on the
+ polling island. There is however a possibility that the polling island from
+ which we got the epoll_fd, got merged with another island in the meantime.
+ This is okay because in such a case, we will wakeup right-away from
+ epoll_pwait() (because any merge will poison the old polling island's epoll
+ set 'polling_island_wakeup_fd') and then pick up the latest polling_island
+ the next time this function - pollset_work_and_unlock()) is called */
if (pollset->po.pi == NULL) {
pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
if (pollset->po.pi == NULL) {
GPR_TIMER_END("pollset_work_and_unlock", 0);
- return; /* Fatal error. We cannot continue */
+ return; /* Fatal error. Cannot continue */
}
PI_ADD_REF(pollset->po.pi, "ps");
@@ -1423,70 +1676,10 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
the completion queue, so there's no need to poll... so we skip that and
redo the complete loop to verify */
if (!maybe_do_workqueue_work(exec_ctx, pi)) {
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
g_current_thread_polling_island = pi;
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- ep_rv = epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms,
- sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_asprintf(&err_msg,
- "epoll_wait() epoll fd: %d failed with error: %d (%s)",
- epoll_fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- } else {
- /* We were interrupted. Save an interation by doing a zero timeout
- epoll_wait to see if there are any other events of interest */
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p received kick",
- (void *)pollset, (void *)worker);
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- }
- }
-
-#ifdef GRPC_TSAN
- /* See the definition of g_poll_sync for more details */
- gpr_atm_acq_load(&g_epoll_sync);
-#endif /* defined(GRPC_TSAN) */
-
- for (int i = 0; i < ep_rv; ++i) {
- void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &global_wakeup_fd) {
- grpc_timer_consume_kick();
- append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
- err_desc);
- } else if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&pi->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, pi);
- } else if (data_ptr == &polling_island_wakeup_fd) {
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
- "%d) got merged",
- (void *)pollset, (void *)worker, epoll_fd);
- /* This means that our polling island is merged with a different
- island. We do not have to do anything here since the subsequent call
- to the function pollset_work_and_unlock() will pick up the correct
- epoll_fd */
- } else {
- grpc_fd *fd = data_ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
- }
-
+ pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now,
+ deadline, sig_mask, error);
g_current_thread_polling_island = NULL;
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
}
GPR_ASSERT(pi != NULL);
@@ -1510,14 +1703,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_timespec now, gpr_timespec deadline) {
GPR_TIMER_BEGIN("pollset_work", 0);
grpc_error *error = GRPC_ERROR_NONE;
- int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
-
- sigset_t new_mask;
grpc_pollset_worker worker;
- worker.next = worker.prev = NULL;
- worker.pt_id = pthread_self();
- gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
+ pollset_worker_init(&worker);
if (worker_hdl) *worker_hdl = &worker;
@@ -1551,9 +1739,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
misses acting on a kick */
if (!g_initialized_sigmask) {
- sigemptyset(&new_mask);
- sigaddset(&new_mask, grpc_wakeup_signal);
- pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
+ sigemptyset(&g_wakeup_sig_set);
+ sigaddset(&g_wakeup_sig_set, grpc_wakeup_signal);
+ pthread_sigmask(SIG_BLOCK, &g_wakeup_sig_set, &g_orig_sigmask);
sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
g_initialized_sigmask = true;
/* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
@@ -1568,7 +1756,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
push_front_worker(pollset, &worker); /* Add worker to pollset */
- pollset_work_and_unlock(exec_ctx, pollset, &worker, timeout_ms,
+ pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline,
&g_orig_sigmask, &error);
grpc_exec_ctx_flush(exec_ctx);
diff --git a/test/core/iomgr/ev_epoll_linux_test.c b/test/core/iomgr/ev_epoll_linux_test.c
index 0856023b14..f7745cddb4 100644
--- a/test/core/iomgr/ev_epoll_linux_test.c
+++ b/test/core/iomgr/ev_epoll_linux_test.c
@@ -38,7 +38,10 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include <errno.h>
+#include <signal.h>
+#include <stdio.h>
#include <string.h>
+#include <sys/time.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -327,7 +330,7 @@ static __thread int thread_wakeups = 0;
static void test_threading_loop(void *arg) {
threading_shared *shared = arg;
- while (thread_wakeups < 1000000) {
+ while (thread_wakeups < 20000) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker;
gpr_mu_lock(shared->mu);
@@ -360,7 +363,7 @@ static void test_threading(void) {
shared.pollset = gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(shared.pollset, &shared.mu);
- gpr_thd_id thds[10];
+ gpr_thd_id thds[20];
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
gpr_thd_options opt = gpr_thd_options_default();
gpr_thd_options_set_joinable(&opt);
@@ -399,6 +402,44 @@ static void test_threading(void) {
gpr_free(shared.pollset);
}
+/* Convert milliseconds into 'struct timespec' struct. millis == -1 is
+ * * considered as an infinity-time in future */
+static struct timespec millis_to_timespec(int millis) {
+ struct timespec linux_ts;
+ gpr_timespec gpr_ts;
+
+ if (millis == -1) {
+ gpr_ts = gpr_inf_future(GPR_TIMESPAN);
+ } else {
+ gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN);
+ }
+
+ linux_ts.tv_sec = (time_t)gpr_ts.tv_sec;
+ linux_ts.tv_nsec = gpr_ts.tv_nsec;
+ return linux_ts;
+}
+
+void test_sigwait() {
+ sigset_t wakeup_sig_set;
+ sigemptyset(&wakeup_sig_set);
+ sigaddset(&wakeup_sig_set, SIGRTMIN + 6);
+ int timeout_ms[] = {10, 1400};
+
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(timeout_ms); i++) {
+ struct timespec sigwait_timeout = millis_to_timespec(timeout_ms[i]);
+ gpr_log(GPR_ERROR, "sigwait_timeout: %ld, %ld", sigwait_timeout.tv_sec,
+ sigwait_timeout.tv_nsec);
+
+ gpr_log(GPR_ERROR, "Waiting for %d ms...", timeout_ms[i]);
+ gpr_timespec bef = gpr_now(GPR_CLOCK_REALTIME);
+ sigtimedwait(&wakeup_sig_set, NULL, &sigwait_timeout);
+ gpr_timespec af = gpr_now(GPR_CLOCK_REALTIME);
+
+ gpr_log(GPR_ERROR, "Bef: %ld, %d", bef.tv_sec, bef.tv_nsec);
+ gpr_log(GPR_ERROR, "Aft: %ld, %d", af.tv_sec, af.tv_nsec);
+ }
+}
+
int main(int argc, char **argv) {
const char *poll_strategy = NULL;
grpc_test_init(argc, argv);
@@ -409,6 +450,7 @@ int main(int argc, char **argv) {
test_add_fd_to_pollset();
test_pollset_queue_merge_items();
test_threading();
+ test_sigwait();
} else {
gpr_log(GPR_INFO,
"Skipping the test. The test is only relevant for 'epoll' "