diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.c')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 361 |
1 files changed, 127 insertions, 234 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 7cb6085e25..5574838187 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -59,7 +44,6 @@ #include "src/core/lib/iomgr/sys_epoll_wrapper.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/spinlock.h" @@ -139,17 +123,6 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; - grpc_closure_scheduler workqueue_scheduler; - /* Spinlock guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_spinlock workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ @@ -172,12 +145,6 @@ struct grpc_fd { static void fd_global_init(void); static void fd_global_shutdown(void); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); - -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - /******************************************************************************* * Pollset Declarations */ @@ -202,12 +169,20 @@ struct grpc_pollset_worker { pollable *pollable; }; +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + struct grpc_pollset { pollable pollable; pollable *current_pollable; + int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; + + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; }; /******************************************************************************* @@ -256,15 +231,18 @@ static bool append_error(grpc_error **composite, grpc_error *error, static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; -#ifdef GRPC_FD_REF_COUNT_DEBUG +#ifndef NDEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(ec, fd, n, reason) \ unref_by(ec, fd, n, reason, __FILE__, __LINE__) static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, int line) { - gpr_log(GPR_DEBUG, "FD %d %p ref %d %ld -> %ld [%s; %s:%d]", fd->fd, - (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); + if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + gpr_log(GPR_DEBUG, + "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", + fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line); + } #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n) @@ -289,20 +267,21 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&fd_freelist_mu); } -#ifdef GRPC_FD_REF_COUNT_DEBUG +#ifndef NDEBUG static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, const char *reason, const char *file, int line) { - gpr_atm old; - gpr_log(GPR_DEBUG, "FD %d %p unref %d %ld -> %ld [%s; %s:%d]", fd->fd, - (void *)fd, n, gpr_atm_no_barrier_load(&fd->refst), - gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); + if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + gpr_log(GPR_DEBUG, + "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", + fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst), + gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); + } #else static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) { - gpr_atm old; #endif - old = gpr_atm_full_fetch_add(&fd->refst, -n); + gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd, + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } else { @@ -347,21 +326,16 @@ static grpc_fd *fd_create(int fd, const char *name) { grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); - GRPC_LOG_IF_ERROR("fd_create", - grpc_wakeup_fd_init(&new_fd->workqueue_wakeup_fd)); - new_fd->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; - new_fd->workqueue_read_mu = GPR_SPINLOCK_INITIALIZER; - gpr_mpscq_init(&new_fd->workqueue_items); - gpr_atm_no_barrier_store(&new_fd->workqueue_item_count, 0); - new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; char *fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); -#ifdef GRPC_FD_REF_COUNT_DEBUG - gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name); +#ifndef NDEBUG + if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { + gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name); + } #endif gpr_free(fd_name); return new_fd; @@ -407,7 +381,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); gpr_mu_unlock(&fd->pollable.po.mu); @@ -446,91 +420,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - REF_BY(fd, 2, "return_workqueue"); - return (grpc_workqueue *)fd; -} - -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2, file, line, reason); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2, file, line, reason); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - ref_by((grpc_fd *)workqueue, 2); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - unref_by(exec_ctx, (grpc_fd *)workqueue, 2); - } -} -#endif - -static void workqueue_wakeup(grpc_fd *fd) { - GRPC_LOG_IF_ERROR("workqueue_enqueue", - grpc_wakeup_fd_wakeup(&fd->workqueue_wakeup_fd)); -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_fd *fd = (grpc_fd *)(((char *)closure->scheduler) - - offsetof(grpc_fd, workqueue_scheduler)); - REF_BY(fd, 2, "workqueue_enqueue"); - gpr_atm last = gpr_atm_no_barrier_fetch_add(&fd->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&fd->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_wakeup(fd); - } - UNREF_BY(exec_ctx, fd, 2, "workqueue_enqueue"); -} - -static void fd_invoke_workqueue(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* handle spurious wakeups */ - if (!gpr_spinlock_trylock(&fd->workqueue_read_mu)) return; - gpr_mpscq_node *n = gpr_mpscq_pop(&fd->workqueue_items); - gpr_spinlock_unlock(&fd->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&fd->workqueue_item_count, -1) > 1) { - workqueue_wakeup(fd); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - } else if (gpr_atm_no_barrier_load(&fd->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_wakeup(fd); - } -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - return &((grpc_fd *)workqueue)->workqueue_scheduler; -} - /******************************************************************************* * Pollable Definitions */ @@ -562,7 +451,7 @@ static grpc_error *pollable_materialize(pollable *p) { return err; } struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET), - .data.ptr = &p->wakeup}; + .data.ptr = (void *)(1 | (intptr_t)&p->wakeup)}; if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { err = GRPC_OS_ERROR(errno, "epoll_ctl"); close(new_epfd); @@ -596,22 +485,7 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { .data.ptr = fd}; if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { - case EEXIST: /* if this fd is already in the epoll set, the workqueue fd - must also be - just return */ - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - default: - append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); - } - } - struct epoll_event ev_wq = { - .events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLEXCLUSIVE), - .data.ptr = (void *)(1 + (intptr_t)fd)}; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->workqueue_wakeup_fd.read_fd, &ev_wq) != - 0) { - switch (errno) { - case EEXIST: /* if the workqueue fd is already in the epoll set we're ok - - no need to do anything special */ + case EEXIST: break; default: append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); @@ -643,8 +517,20 @@ static void pollset_global_shutdown(void) { gpr_tls_destroy(&g_current_thread_worker); } -static grpc_error *pollset_kick_all(grpc_pollset *pollset) { +static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && + pollset->kick_alls_pending == 0) { + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + pollset->shutdown_closure = NULL; + } +} + +static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; + grpc_pollset *pollset = arg; + gpr_mu_lock(&pollset->pollable.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { @@ -665,7 +551,17 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { worker = worker->links[PWL_POLLSET].next; } while (worker != pollset->root_worker); } - return error; + pollset->kick_alls_pending--; + pollset_maybe_finish_shutdown(exec_ctx, pollset); + gpr_mu_unlock(&pollset->pollable.po.mu); + GRPC_LOG_IF_ERROR("kick_all", error); +} + +static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + pollset->kick_alls_pending++; + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, @@ -804,20 +700,12 @@ static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { return error; } -static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { - if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { - grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); - pollset->shutdown_closure = NULL; - } -} - /* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset)); + pollset_kick_all(exec_ctx, pollset); pollset_maybe_finish_shutdown(exec_ctx, pollset); } @@ -825,6 +713,46 @@ static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { return p != &g_empty_pollable && p != &pollset->pollable; } +static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, bool drain) { + static const char *err_desc = "pollset_process_events"; + grpc_error *error = GRPC_ERROR_NONE; + for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && + pollset->event_cursor != pollset->event_count; + i++) { + int n = pollset->event_cursor++; + struct epoll_event *ev = &pollset->events[n]; + void *data_ptr = ev->data.ptr; + if (1 & (intptr_t)data_ptr) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr); + } + append_error(&error, grpc_wakeup_fd_consume_wakeup( + (void *)((~(intptr_t)1) & (intptr_t)data_ptr)), + err_desc); + } else { + grpc_fd *fd = (grpc_fd *)data_ptr; + bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ev->events & EPOLLOUT) != 0; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, + "PS:%p got fd %p: cancel=%d read=%d " + "write=%d", + pollset, fd, cancel, read_ev, write_ev); + } + if (read_ev || cancel) { + fd_become_readable(exec_ctx, fd, pollset); + } + if (write_ev || cancel) { + fd_become_writable(exec_ctx, fd); + } + } + } + + return error; +} + /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { pollable_destroy(&pollset->pollable); @@ -832,16 +760,13 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, "pollset_pollable"); } + GRPC_LOG_IF_ERROR("pollset_process_events", + pollset_process_events(exec_ctx, pollset, true)); } -#define MAX_EPOLL_EVENTS 100 - static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollable *p, gpr_timespec now, gpr_timespec deadline) { - struct epoll_event events[MAX_EPOLL_EVENTS]; - static const char *err_desc = "pollset_poll"; - int timeout = poll_deadline_to_millis_timeout(deadline, now); if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -853,7 +778,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } int r; do { - r = epoll_wait(p->epfd, events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION; @@ -865,43 +790,10 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); } - grpc_error *error = GRPC_ERROR_NONE; - for (int i = 0; i < r; i++) { - void *data_ptr = events[i].data.ptr; - if (data_ptr == &p->wakeup) { - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got pollset_wakeup", pollset, p); - } - append_error(&error, grpc_wakeup_fd_consume_wakeup(&p->wakeup), err_desc); - } else { - grpc_fd *fd = (grpc_fd *)(((intptr_t)data_ptr) & ~(intptr_t)1); - bool is_workqueue = (((intptr_t)data_ptr) & 1) != 0; - bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0; - bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0; - bool write_ev = (events[i].events & EPOLLOUT) != 0; - if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, - "PS:%p poll %p got fd %p: is_wq=%d cancel=%d read=%d " - "write=%d", - pollset, p, fd, is_workqueue, cancel, read_ev, write_ev); - } - if (is_workqueue) { - append_error(&error, - grpc_wakeup_fd_consume_wakeup(&fd->workqueue_wakeup_fd), - err_desc); - fd_invoke_workqueue(exec_ctx, fd); - } else { - if (read_ev || cancel) { - fd_become_readable(exec_ctx, fd, pollset); - } - if (write_ev || cancel) { - fd_become_writable(exec_ctx, fd); - } - } - } - } + pollset->event_cursor = 0; + pollset->event_count = r; - return error; + return GRPC_ERROR_NONE; } /* Return true if first in list */ @@ -1053,10 +945,13 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_unlock(&worker.pollable->po.mu); } gpr_mu_unlock(&pollset->pollable.po.mu); - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, now, - deadline), + if (pollset->event_cursor == pollset->event_count) { + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + now, deadline), + err_desc); + } + append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->pollable.po.mu); if (worker.pollable != &pollset->pollable) { gpr_mu_lock(&worker.pollable->po.mu); @@ -1069,6 +964,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (worker.pollable != &pollset->pollable) { gpr_mu_unlock(&worker.pollable->po.mu); } + if (grpc_exec_ctx_has_work(exec_ctx)) { + gpr_mu_unlock(&pollset->pollable.po.mu); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&pollset->pollable.po.mu); + } return error; } @@ -1090,7 +990,7 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); /* empty pollable --> single fd pollable */ - append_error(&error, pollset_kick_all(pollset), err_desc); + pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &fd->pollable; if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); @@ -1107,15 +1007,15 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", pollset, fd, had_fd); - append_error(&error, pollset_kick_all(pollset), err_desc); + pollset_kick_all(exec_ctx, pollset); pollset->current_pollable = &pollset->pollable; if (append_error(&error, pollable_materialize(&pollset->pollable), err_desc)) { pollable_add_fd(&pollset->pollable, had_fd); pollable_add_fd(&pollset->pollable, fd); } - grpc_closure_sched(exec_ctx, - grpc_closure_create(unref_fd_no_longer_poller, had_fd, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -1449,7 +1349,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1467,17 +1366,11 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested) { - if (!explicitly_requested) return NULL; - if (!grpc_has_wakeup_fd()) { return NULL; } |