aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c146
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.c9
-rw-r--r--src/core/lib/iomgr/socket_mutator.c10
3 files changed, 100 insertions, 65 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index abcf7b429f..949f8a845d 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -169,12 +169,20 @@ struct grpc_pollset_worker {
pollable *pollable;
};
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
+
struct grpc_pollset {
pollable pollable;
pollable *current_pollable;
+ int kick_alls_pending;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
grpc_pollset_worker *root_worker;
+
+ int event_cursor;
+ int event_count;
+ struct epoll_event events[MAX_EPOLL_EVENTS];
};
/*******************************************************************************
@@ -437,7 +445,7 @@ static grpc_error *pollable_materialize(pollable *p) {
return err;
}
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
- .data.ptr = &p->wakeup};
+ .data.ptr = (void *)(1 | (intptr_t)&p->wakeup)};
if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
err = GRPC_OS_ERROR(errno, "epoll_ctl");
close(new_epfd);
@@ -503,8 +511,20 @@ static void pollset_global_shutdown(void) {
gpr_tls_destroy(&g_current_thread_worker);
}
-static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
+static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
+ pollset->kick_alls_pending == 0) {
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+ pollset->shutdown_closure = NULL;
+ }
+}
+
+static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_unused) {
grpc_error *error = GRPC_ERROR_NONE;
+ grpc_pollset *pollset = arg;
+ gpr_mu_lock(&pollset->pollable.po.mu);
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
@@ -525,7 +545,17 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
worker = worker->links[PWL_POLLSET].next;
} while (worker != pollset->root_worker);
}
- return error;
+ pollset->kick_alls_pending--;
+ pollset_maybe_finish_shutdown(exec_ctx, pollset);
+ gpr_mu_unlock(&pollset->pollable.po.mu);
+ GRPC_LOG_IF_ERROR("kick_all", error);
+}
+
+static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ pollset->kick_alls_pending++;
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
@@ -664,20 +694,12 @@ static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
return error;
}
-static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) {
- GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
- pollset->shutdown_closure = NULL;
- }
-}
-
/* pollset->po.mu lock must be held by the caller before calling this */
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
pollset->shutdown_closure = closure;
- GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
+ pollset_kick_all(exec_ctx, pollset);
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -685,6 +707,46 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
return p != &g_empty_pollable && p != &pollset->pollable;
}
+static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset, bool drain) {
+ static const char *err_desc = "pollset_process_events";
+ grpc_error *error = GRPC_ERROR_NONE;
+ for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
+ pollset->event_cursor != pollset->event_count;
+ i++) {
+ int n = pollset->event_cursor++;
+ struct epoll_event *ev = &pollset->events[n];
+ void *data_ptr = ev->data.ptr;
+ if (1 & (intptr_t)data_ptr) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr);
+ }
+ append_error(&error, grpc_wakeup_fd_consume_wakeup(
+ (void *)((~(intptr_t)1) & (intptr_t)data_ptr)),
+ err_desc);
+ } else {
+ grpc_fd *fd = (grpc_fd *)data_ptr;
+ bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ev->events & EPOLLOUT) != 0;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p got fd %p: cancel=%d read=%d "
+ "write=%d",
+ pollset, fd, cancel, read_ev, write_ev);
+ }
+ if (read_ev || cancel) {
+ fd_become_readable(exec_ctx, fd, pollset);
+ }
+ if (write_ev || cancel) {
+ fd_become_writable(exec_ctx, fd);
+ }
+ }
+ }
+
+ return error;
+}
+
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
pollable_destroy(&pollset->pollable);
@@ -692,16 +754,13 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2,
"pollset_pollable");
}
+ GRPC_LOG_IF_ERROR("pollset_process_events",
+ pollset_process_events(exec_ctx, pollset, true));
}
-#define MAX_EPOLL_EVENTS 100
-
static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollable *p, gpr_timespec now,
gpr_timespec deadline) {
- struct epoll_event events[MAX_EPOLL_EVENTS];
- static const char *err_desc = "pollset_poll";
-
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -713,7 +772,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
int r;
do {
- r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout);
+ r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION;
@@ -725,35 +784,10 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
}
- grpc_error *error = GRPC_ERROR_NONE;
- for (int i = 0; i < r; i++) {
- void *data_ptr = events[i].data.ptr;
- if (data_ptr == &p->wakeup) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p);
- }
- append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc);
- } else {
- grpc_fd *fd = (grpc_fd *)data_ptr;
- bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
- bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
- bool write_ev = (events[i].events & EPOLLOUT) != 0;
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG,
- "PS:%p poll %p got fd %p: cancel=%d read=%d "
- "write=%d",
- pollset, p, fd, cancel, read_ev, write_ev);
- }
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
- }
+ pollset->event_cursor = 0;
+ pollset->event_count = r;
- return error;
+ return GRPC_ERROR_NONE;
}
/* Return true if first in list */
@@ -905,10 +939,13 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_unlock(&worker.pollable->po.mu);
}
gpr_mu_unlock(&pollset->pollable.po.mu);
- append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, now,
- deadline),
+ if (pollset->event_cursor == pollset->event_count) {
+ append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable,
+ now, deadline),
+ err_desc);
+ }
+ append_error(&error, pollset_process_events(exec_ctx, pollset, false),
err_desc);
- grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->pollable.po.mu);
if (worker.pollable != &pollset->pollable) {
gpr_mu_lock(&worker.pollable->po.mu);
@@ -921,6 +958,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker.pollable != &pollset->pollable) {
gpr_mu_unlock(&worker.pollable->po.mu);
}
+ if (grpc_exec_ctx_has_work(exec_ctx)) {
+ gpr_mu_unlock(&pollset->pollable.po.mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->pollable.po.mu);
+ }
return error;
}
@@ -942,7 +984,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
"PS:%p add fd %p; transition pollable from empty to fd", pollset,
fd);
/* empty pollable --> single fd pollable */
- append_error(&error, pollset_kick_all(pollset), err_desc);
+ pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &fd->pollable;
if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu);
append_error(&error, fd_become_pollable_locked(fd), err_desc);
@@ -959,7 +1001,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG,
"PS:%p add fd %p; transition pollable from fd %p to multipoller",
pollset, fd, had_fd);
- append_error(&error, pollset_kick_all(pollset), err_desc);
+ pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
err_desc)) {
@@ -1323,8 +1365,6 @@ static const grpc_event_engine_vtable vtable = {
const grpc_event_engine_vtable *grpc_init_epollex_linux(
bool explicitly_requested) {
- if (!explicitly_requested) return NULL;
-
if (!grpc_has_wakeup_fd()) {
return NULL;
}
diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c
index 7d25bc1265..0f82dea570 100644
--- a/src/core/lib/iomgr/socket_factory_posix.c
+++ b/src/core/lib/iomgr/socket_factory_posix.c
@@ -20,6 +20,7 @@
#ifdef GRPC_POSIX_SOCKET
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/socket_factory_posix.h"
#include <grpc/impl/codegen/grpc_types.h>
@@ -84,12 +85,8 @@ static const grpc_arg_pointer_vtable socket_factory_arg_vtable = {
socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp};
grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) {
- grpc_arg arg;
- arg.type = GRPC_ARG_POINTER;
- arg.key = GRPC_ARG_SOCKET_FACTORY;
- arg.value.pointer.vtable = &socket_factory_arg_vtable;
- arg.value.pointer.p = factory;
- return arg;
+ return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_FACTORY, factory,
+ &socket_factory_arg_vtable);
}
#endif
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
index c4b9a0930b..5d6c2c400e 100644
--- a/src/core/lib/iomgr/socket_mutator.c
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -18,6 +18,8 @@
#include "src/core/lib/iomgr/socket_mutator.h"
+#include "src/core/lib/channel/channel_args.h"
+
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -74,10 +76,6 @@ static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = {
socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp};
grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) {
- grpc_arg arg;
- arg.type = GRPC_ARG_POINTER;
- arg.key = GRPC_ARG_SOCKET_MUTATOR;
- arg.value.pointer.vtable = &socket_mutator_arg_vtable;
- arg.value.pointer.p = mutator;
- return arg;
+ return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_MUTATOR, mutator,
+ &socket_mutator_arg_vtable);
}