aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/pollset_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/pollset_posix.c')
-rw-r--r--src/core/iomgr/pollset_posix.c179
1 files changed, 111 insertions, 68 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index f3e424e83c..82a82cc064 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -47,6 +47,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/profiling/timers.h"
+#include "src/core/support/block_annotate.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
@@ -56,8 +57,16 @@
GPR_TLS_DECL(g_current_thread_poller);
GPR_TLS_DECL(g_current_thread_worker);
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
grpc_poll_function_type grpc_poll_function = poll;
+/** The alarm system needs to be able to wakeup 'some poller' sometimes
+ * (specifically when a new alarm needs to be triggered earlier than the next
+ * alarm 'epoch').
+ * This wakeup_fd gives us something to alert on when such a case occurs. */
+grpc_wakeup_fd grpc_global_wakeup_fd;
+
static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
worker->prev->next = worker->next;
worker->next->prev = worker->prev;
@@ -118,12 +127,20 @@ void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
void grpc_pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
grpc_wakeup_fd_global_init();
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
void grpc_pollset_global_shutdown(void) {
- gpr_tls_destroy(&g_current_thread_poller);
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
grpc_wakeup_fd_global_destroy();
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+void grpc_kick_poller(void) {
+ grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
}
/* main interface */
@@ -136,12 +153,14 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
become_basic_pollset(pollset, NULL);
}
-void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->add_fd(pollset, fd, 1);
+ pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -152,9 +171,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
gpr_mu_lock(&pollset->mu);
- pollset->vtable->del_fd(pollset, fd, 1);
+ pollset->vtable->del_fd(exec_ctx, pollset, fd, 1);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -165,23 +185,30 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
#endif
}
-static void finish_shutdown(grpc_pollset *pollset) {
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
pollset->vtable->finish_shutdown(pollset);
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
}
-void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
- gpr_timespec now, gpr_timespec deadline) {
+void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, gpr_timespec now,
+ gpr_timespec deadline) {
/* pollset->mu already held */
int added_worker = 0;
+ int locked = 1;
/* this must happen before we (potentially) drop pollset->mu */
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
+ if (!grpc_pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
goto done;
}
- if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
+ if (grpc_alarm_check(exec_ctx, now, &deadline)) {
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
goto done;
}
if (pollset->shutting_down) {
@@ -190,19 +217,28 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
if (pollset->in_flight_cbs) {
/* Give do_promote priority so we don't starve it out */
gpr_mu_unlock(&pollset->mu);
- gpr_mu_lock(&pollset->mu);
+ locked = 0;
goto done;
}
if (!pollset->kicked_without_pollers) {
push_front_worker(pollset, worker);
added_worker = 1;
gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
- pollset->vtable->maybe_work(pollset, worker, deadline, now, 1);
+ gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
+ pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, deadline,
+ now);
+ locked = 0;
gpr_tls_set(&g_current_thread_poller, 0);
+ gpr_tls_set(&g_current_thread_worker, 0);
} else {
pollset->kicked_without_pollers = 0;
}
done:
+ if (!locked) {
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
grpc_wakeup_fd_destroy(&worker->wakeup_fd);
if (added_worker) {
remove_worker(pollset, worker);
@@ -213,19 +249,24 @@ done:
} else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
pollset->called_shutdown = 1;
gpr_mu_unlock(&pollset->mu);
- finish_shutdown(pollset);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
/* Continuing to access pollset here is safe -- it is the caller's
* responsibility to not destroy when it has outstanding calls to
* grpc_pollset_work.
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
}
}
}
-void grpc_pollset_shutdown(grpc_pollset *pollset,
- void (*shutdown_done)(void *arg),
- void *shutdown_done_arg) {
+void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
int call_shutdown = 0;
gpr_mu_lock(&pollset->mu);
GPR_ASSERT(!pollset->shutting_down);
@@ -235,13 +276,15 @@ void grpc_pollset_shutdown(grpc_pollset *pollset,
pollset->called_shutdown = 1;
call_shutdown = 1;
}
- pollset->shutdown_done_cb = shutdown_done;
- pollset->shutdown_done_arg = shutdown_done_arg;
+ if (!grpc_pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ }
+ pollset->shutdown_done = closure;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
gpr_mu_unlock(&pollset->mu);
if (call_shutdown) {
- finish_shutdown(pollset);
+ finish_shutdown(exec_ctx, pollset);
}
}
@@ -267,7 +310,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
}
timeout = gpr_time_sub(deadline, now);
return gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN)));
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
}
/*
@@ -279,15 +322,14 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure promotion_closure;
+ grpc_closure promotion_closure;
} grpc_unary_promote_args;
-static void basic_do_promote(void *args, int success) {
+static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
- int do_shutdown_cb = 0;
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@@ -300,12 +342,7 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
- if (grpc_pollset_has_workers(pollset)) {
- grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- grpc_iomgr_add_callback(&up_args->promotion_closure);
- gpr_mu_unlock(&pollset->mu);
- return;
- }
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
@@ -317,21 +354,20 @@ static void basic_do_promote(void *args, int success) {
if (pollset->shutting_down) {
/* We don't care about this pollset anymore. */
if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
- GPR_ASSERT(!grpc_pollset_has_workers(pollset));
- pollset->called_shutdown = 1;
- do_shutdown_cb = 1;
+ finish_shutdown(exec_ctx, pollset);
}
} else if (grpc_fd_is_orphaned(fd)) {
/* Don't try to add it to anything, we'll drop our ref on it below */
} else if (pollset->vtable != original_vtable) {
- pollset->vtable->add_fd(pollset, fd, 0);
+ pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
} else if (fd != pollset->data.ptr) {
grpc_fd *fds[2];
fds[0] = pollset->data.ptr;
fds[1] = fd;
if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(exec_ctx, pollset, fds,
+ GPR_ARRAY_SIZE(fds));
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -346,16 +382,12 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_unlock(&pollset->mu);
- if (do_shutdown_cb) {
- pollset->shutdown_done_cb(pollset->shutdown_done_arg);
- }
-
/* Matching ref in basic_pollset_add_fd */
GRPC_FD_UNREF(fd, "basicpoll_add");
}
-static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd, int and_unlock_pollset) {
grpc_unary_promote_args *up_args;
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) goto exit;
@@ -372,7 +404,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
pollset->data.ptr = fd;
GRPC_FD_REF(fd, "basicpoll");
} else if (!grpc_fd_is_orphaned(fds[0])) {
- grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_platform_become_multipoller(exec_ctx, pollset, fds,
+ GPR_ARRAY_SIZE(fds));
GRPC_FD_UNREF(fds[0], "basicpoll");
} else {
/* old fd is orphaned and we haven't cleaned it up until now, so remain a
@@ -389,13 +422,13 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GRPC_FD_REF(fd, "basicpoll_add");
pollset->in_flight_cbs++;
up_args = gpr_malloc(sizeof(*up_args));
- up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
+ up_args->pollset = pollset;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_iomgr_add_callback(&up_args->promotion_closure);
+ grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
@@ -404,8 +437,8 @@ exit:
}
}
-static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
- int and_unlock_pollset) {
+static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd, int and_unlock_pollset) {
GPR_ASSERT(fd);
if (fd == pollset->data.ptr) {
GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
@@ -417,11 +450,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd,
}
}
-static void basic_pollset_maybe_work(grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec deadline, gpr_timespec now,
- int allow_synchronous_callback) {
- struct pollfd pfd[2];
+static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ grpc_pollset_worker *worker,
+ gpr_timespec deadline,
+ gpr_timespec now) {
+ struct pollfd pfd[3];
grpc_fd *fd;
grpc_fd_watcher fd_watcher;
int timeout;
@@ -434,31 +468,38 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
fd = pollset->data.ptr = NULL;
}
timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
- pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
pfd[0].events = POLLIN;
pfd[0].revents = 0;
- nfds = 1;
+ pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+ nfds = 2;
if (fd) {
- pfd[1].fd = fd->fd;
- pfd[1].revents = 0;
+ pfd[2].fd = fd->fd;
+ pfd[2].revents = 0;
gpr_mu_unlock(&pollset->mu);
- pfd[1].events =
+ pfd[2].events =
(short)grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT, &fd_watcher);
- if (pfd[1].events != 0) {
+ if (pfd[2].events != 0) {
nfds++;
}
} else {
gpr_mu_unlock(&pollset->mu);
}
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
/* poll fd count (argument 2) is shortened by one if we have no events
to poll on - such that it only includes the kicker */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
r = grpc_poll_function(pfd, nfds, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
if (fd) {
- grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN,
- pfd[1].revents & POLLOUT);
+ grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN,
+ pfd[2].revents & POLLOUT);
}
if (r < 0) {
@@ -469,19 +510,20 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset,
/* do nothing */
} else {
if (pfd[0].revents & POLLIN) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfd[1].revents & POLLIN) {
grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
}
- if (nfds > 1) {
- if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) {
- grpc_fd_become_readable(fd, allow_synchronous_callback);
+ if (nfds > 2) {
+ if (pfd[2].revents & (POLLIN | POLLHUP | POLLERR)) {
+ grpc_fd_become_readable(exec_ctx, fd);
}
- if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) {
- grpc_fd_become_writable(fd, allow_synchronous_callback);
+ if (pfd[2].revents & (POLLOUT | POLLHUP | POLLERR)) {
+ grpc_fd_become_writable(exec_ctx, fd);
}
}
}
-
- gpr_mu_lock(&pollset->mu);
}
static void basic_pollset_destroy(grpc_pollset *pollset) {
@@ -492,8 +534,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) {
}
static const grpc_pollset_vtable basic_pollset = {
- basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work,
- basic_pollset_destroy, basic_pollset_destroy};
+ basic_pollset_add_fd, basic_pollset_del_fd,
+ basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
+ basic_pollset_destroy};
static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
pollset->vtable = &basic_pollset;