aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-28 17:03:34 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-09-28 17:03:34 -0700
commit8afeec88380c1e226949f0017540cb646d37eb5e (patch)
treeca53a88ad1e6f8d24c7bc10ebb2a92a7ca4776e8 /src
parent10ce383e1bfc84ffdfaba082bd413be67c5146d6 (diff)
Fix alarms
- make kick_poller() do something on POSIX - fix some conditions whereby alarms are held in a pollset exec context for too long - make channel_connectivity tests dependent on the correct behavior
Diffstat (limited to 'src')
-rw-r--r--src/core/iomgr/iomgr.c7
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c25
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c22
-rw-r--r--src/core/iomgr/pollset_posix.c43
-rw-r--r--src/core/iomgr/pollset_posix.h1
5 files changed, 64 insertions, 34 deletions
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 612419b70e..d8d84f7457 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -51,13 +51,6 @@ static gpr_cv g_rcv;
static int g_shutdown;
static grpc_iomgr_object g_root_object;
-void grpc_kick_poller(void) {
- /* Empty. The background callback executor polls periodically. The activity
- * the kicker is trying to draw the executor's attention to will be picked up
- * either by one of the periodic wakeups or by one of the polling application
- * threads. */
-}
-
void grpc_iomgr_init(void) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index a4293eb4a4..bcb36b472b 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -211,11 +211,15 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
int write = ep_ev[i].events & EPOLLOUT;
- if (read || cancel) {
- grpc_fd_become_readable(exec_ctx, fd);
- }
- if (write || cancel) {
- grpc_fd_become_writable(exec_ctx, fd);
+ if (fd == NULL) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ } else {
+ if (read || cancel) {
+ grpc_fd_become_readable(exec_ctx, fd);
+ }
+ if (write || cancel) {
+ grpc_fd_become_writable(exec_ctx, fd);
+ }
}
}
}
@@ -244,6 +248,8 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
size_t nfds) {
size_t i;
pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
+ epoll_event ev;
+ int err;
pollset->vtable = &multipoll_with_epoll_pollset;
pollset->data.ptr = h;
@@ -253,6 +259,15 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
abort();
}
+
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = NULL;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
+
for (i = 0; i < nfds; i++) {
multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
}
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
index 44031b8ef6..240e9daf8e 100644
--- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -114,13 +114,16 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h = pollset->data.ptr;
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
/* TODO(ctiller): perform just one malloc here if we exceed the inline case */
- pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1));
- watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1));
+ pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
fd_count = 0;
- pfd_count = 1;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfds[0].events = POLLIN;
- pfds[0].revents = POLLOUT;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
for (i = 0; i < h->fd_count; i++) {
int remove = grpc_fd_is_orphaned(h->fds[i]);
for (j = 0; !remove && j < h->del_count; j++) {
@@ -143,7 +146,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
h->fd_count = fd_count;
gpr_mu_unlock(&pollset->mu);
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN,
POLLOUT, &watchers[i]);
}
@@ -152,7 +155,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN,
pfds[i].revents & POLLOUT);
}
@@ -165,9 +168,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock(
/* do nothing */
} else {
if (pfds[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- for (i = 1; i < pfd_count; i++) {
+ for (i = 2; i < pfd_count; i++) {
if (watchers[i].fd == NULL) {
continue;
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index e80963e0ea..7adb0e626e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -58,6 +58,7 @@ GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
grpc_poll_function_type grpc_poll_function = poll;
+grpc_wakeup_fd grpc_global_wakeup_fd;
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
@@ -121,12 +122,18 @@ void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
void grpc_pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_global_destroy();
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
+}
+
+void grpc_kick_poller(void) {
+ grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
}
/* main interface */
@@ -193,6 +200,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
goto done;
}
if (grpc_alarm_check(exec_ctx, now, &deadline)) {
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
goto done;
}
if (pollset->shutting_down) {
@@ -294,7 +303,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
/*
@@ -439,7 +448,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
grpc_pollset_worker *worker,
gpr_timespec deadline,
gpr_timespec now) {
- struct pollfd pfd[2];
+ struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
int timeout;
@@ -452,17 +461,20 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
- nfds = 1;
+ pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+ nfds = 2;
if (fd) {
- pfd[1].fd = fd->fd;
- pfd[1].revents = 0;
+ pfd[2].fd = fd->fd;
+ pfd[2].revents = 0;
gpr_mu_unlock(&pollset->mu);
- pfd[1].events =
+ pfd[2].events =
(short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
- if (pfd[1].events != 0) {
+ if (pfd[2].events != 0) {
nfds++;
}
} else {
@@ -477,8 +489,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
- grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[1].revents & POLLIN,
- pfd[1].revents & POLLOUT);
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
+ pfd[2].revents & POLLOUT);
}
if (r < 0) {
@@ -489,13 +501,16 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfd[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- if (nfds > 1) {
- if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
+ if (nfds > 2) {
+ if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
grpc_fd_become_readable(exec_ctx, fd);
}
- if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
grpc_fd_become_writable(exec_ctx, fd);
}
}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index f996dd1edf..83c5258539 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -129,5 +129,6 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
+extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */