diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 146 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_factory_posix.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/socket_mutator.c | 10 |
3 files changed, 100 insertions, 65 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index abcf7b429f..949f8a845d 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -169,12 +169,20 @@ struct grpc_pollset_worker { pollable *pollable; }; +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + struct grpc_pollset { pollable pollable; pollable *current_pollable; + int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; + + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; }; /******************************************************************************* @@ -437,7 +445,7 @@ static grpc_error *pollable_materialize(pollable *p) { return err; } struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), - .data.ptr = &p->wakeup}; + .data.ptr = (void *)(1 | (intptr_t)&p->wakeup)}; if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { err = GRPC_OS_ERROR(errno, "epoll_ctl"); close(new_epfd); @@ -503,8 +511,20 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } -static grpc_error *pollset_kick_all(grpc_pollset *pollset) { +static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && + pollset->kick_alls_pending == 0) { + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + pollset->shutdown_closure = NULL; + } +} + +static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; + grpc_pollset *pollset = arg; + gpr_mu_lock(&pollset->pollable.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { @@ -525,7 +545,17 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { worker = worker->links[PWL_POLLSET].next; } while (worker != pollset->root_worker); } - return error; + pollset->kick_alls_pending--; + pollset_maybe_finish_shutdown(exec_ctx, pollset); + gpr_mu_unlock(&pollset->pollable.po.mu); + GRPC_LOG_IF_ERROR("kick_all", error); +} + +static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + pollset->kick_alls_pending++; + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, @@ -664,20 +694,12 @@ static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { return error; } -static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); - pollset->shutdown_closure = NULL; - } -} - /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); + pollset_kick_all(exec_ctx, pollset); pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -685,6 +707,46 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { return p != &g_empty_pollable && p != &pollset->pollable; } +static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, bool drain) { + static const char *err_desc = "pollset_process_events"; + grpc_error *error = GRPC_ERROR_NONE; + for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && + pollset->event_cursor != pollset->event_count; + i++) { + int n = pollset->event_cursor++; + struct epoll_event *ev = &pollset->events[n]; + void *data_ptr = ev->data.ptr; + if (1 & (intptr_t)data_ptr) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr); + } + append_error(&error, grpc_wakeup_fd_consume_wakeup( + (void *)((~(intptr_t)1) & (intptr_t)data_ptr)), + err_desc); + } else { + grpc_fd *fd = (grpc_fd *)data_ptr; + bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ev->events & EPOLLOUT) != 0; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p got fd %p: cancel=%d read=%d " + "write=%d", + pollset, fd, cancel, read_ev, write_ev); + } + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); + } + } + } + + return error; +} + /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { pollable_destroy(&pollset->pollable); @@ -692,16 +754,13 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, "pollset_pollable"); } + GRPC_LOG_IF_ERROR("pollset_process_events", + pollset_process_events(exec_ctx, pollset, true)); } -#define MAX_EPOLL_EVENTS 100 - static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollable *p, gpr_timespec now, gpr_timespec deadline) { - struct epoll_event events[MAX_EPOLL_EVENTS]; - static const char *err_desc = "pollset_poll"; - int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -713,7 +772,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } int r; do { - r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION; @@ -725,35 +784,10 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); } - grpc_error *error = GRPC_ERROR_NONE; - for (int i = 0; i < r; i++) { - void *data_ptr = events[i].data.ptr; - if (data_ptr == &p->wakeup) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); - } - append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); - } else { - grpc_fd *fd = (grpc_fd *)data_ptr; - bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; - bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; - bool write_ev = (events[i].events & EPOLLOUT) != 0; - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p poll %p got fd %p: cancel=%d read=%d " - "write=%d", - pollset, p, fd, cancel, read_ev, write_ev); - } - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } - } - } + pollset->event_cursor = 0; + pollset->event_count = r; - return error; + return GRPC_ERROR_NONE; } /* Return true if first in list */ @@ -905,10 +939,13 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_unlock(&worker.pollable->po.mu); } gpr_mu_unlock(&pollset->pollable.po.mu); - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, now, - deadline), + if (pollset->event_cursor == pollset->event_count) { + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + now, deadline), + err_desc); + } + append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->pollable.po.mu); if (worker.pollable != &pollset->pollable) { gpr_mu_lock(&worker.pollable->po.mu); @@ -921,6 +958,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (worker.pollable != &pollset->pollable) { gpr_mu_unlock(&worker.pollable->po.mu); } + if (grpc_exec_ctx_has_work(exec_ctx)) { + gpr_mu_unlock(&pollset->pollable.po.mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->pollable.po.mu); + } return error; } @@ -942,7 +984,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); /* empty pollable --> single fd pollable */ - append_error(&error, pollset_kick_all(pollset), err_desc); + pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); @@ -959,7 +1001,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); - append_error(&error, pollset_kick_all(pollset), err_desc); + pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), err_desc)) { @@ -1323,8 +1365,6 @@ static const grpc_event_engine_vtable vtable = { const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested) { - if (!explicitly_requested) return NULL; - if (!grpc_has_wakeup_fd()) { return NULL; } diff --git a/src/core/lib/iomgr/socket_factory_posix.c b/src/core/lib/iomgr/socket_factory_posix.c index 7d25bc1265..0f82dea570 100644 --- a/src/core/lib/iomgr/socket_factory_posix.c +++ b/src/core/lib/iomgr/socket_factory_posix.c @@ -20,6 +20,7 @@ #ifdef GRPC_POSIX_SOCKET +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/socket_factory_posix.h" #include <grpc/impl/codegen/grpc_types.h> @@ -84,12 +85,8 @@ static const grpc_arg_pointer_vtable socket_factory_arg_vtable = { socket_factory_arg_copy, socket_factory_arg_destroy, socket_factory_cmp}; grpc_arg grpc_socket_factory_to_arg(grpc_socket_factory *factory) { - grpc_arg arg; - arg.type = GRPC_ARG_POINTER; - arg.key = GRPC_ARG_SOCKET_FACTORY; - arg.value.pointer.vtable = &socket_factory_arg_vtable; - arg.value.pointer.p = factory; - return arg; + return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_FACTORY, factory, + &socket_factory_arg_vtable); } #endif diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c index c4b9a0930b..5d6c2c400e 100644 --- a/src/core/lib/iomgr/socket_mutator.c +++ b/src/core/lib/iomgr/socket_mutator.c @@ -18,6 +18,8 @@ #include "src/core/lib/iomgr/socket_mutator.h" +#include "src/core/lib/channel/channel_args.h" + #include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -74,10 +76,6 @@ static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = { socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp}; grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) { - grpc_arg arg; - arg.type = GRPC_ARG_POINTER; - arg.key = GRPC_ARG_SOCKET_MUTATOR; - arg.value.pointer.vtable = &socket_mutator_arg_vtable; - arg.value.pointer.p = mutator; - return arg; + return grpc_channel_arg_pointer_create(GRPC_ARG_SOCKET_MUTATOR, mutator, + &socket_mutator_arg_vtable); } |