diff options
author | 2017-09-18 08:57:19 -0700 | |
---|---|---|
committer | 2017-09-18 08:57:19 -0700 | |
commit | adbcec4f57f7e25859326a6daf8ad7e08f2805ca (patch) | |
tree | c029f503127946f0564134d343c8af9f88ac8da5 /src/core/lib/iomgr/ev_epollex_linux.c | |
parent | 7f10c299c7eb5d6688c20ae43c63b0548ca8acd1 (diff) | |
parent | a5f46e29bcee3068f5ec6691b7e06cf944430881 (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.c')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 203 |
1 files changed, 103 insertions, 100 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 4e5bed5016..2a276ca360 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, * pollable Declarations */ -typedef struct pollable_t { +typedef struct pollable { polling_obj po; int epfd; grpc_wakeup_fd wakeup; grpc_pollset_worker *root_worker; -} pollable_t; +} pollable; static const char *polling_obj_type_string(polling_obj_type t) { switch (t) { @@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) { return "<invalid>"; } -static char *pollable_desc(pollable_t *p) { +static char *pollable_desc(pollable *p) { char *out; gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", polling_obj_type_string(p->po.type), p->po.group, p->epfd, @@ -130,19 +130,19 @@ static char *pollable_desc(pollable_t *p) { return out; } -static pollable_t g_empty_pollable; +static pollable g_empty_pollable; -static void pollable_init(pollable_t *p, polling_obj_type type); -static void pollable_destroy(pollable_t *p); +static void pollable_init(pollable *p, polling_obj_type type); +static void pollable_destroy(pollable *p); /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p); +static grpc_error *pollable_materialize(pollable *p); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable_t pollable; + pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -193,15 +193,15 @@ struct grpc_pollset_worker { pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; gpr_cv cv; grpc_pollset *pollset; - pollable_t *pollable; + pollable *pollable_obj; }; #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 struct grpc_pollset { - pollable_t pollable; - pollable_t *current_pollable; + pollable pollable_obj; + pollable *current_pollable_obj; int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; @@ -282,7 +282,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); + pollable_destroy(&fd->pollable_obj); gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -343,7 +343,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable, PO_FD); + pollable_init(&new_fd->pollable_obj, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -385,7 +385,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->pollable_obj.po.mu); gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; @@ -411,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable_t *p, polling_obj_type type) { +static void pollable_init(pollable *p, polling_obj_type type) { po_init(&p->po, type); p->root_worker = NULL; p->epfd = -1; } -static void pollable_destroy(pollable_t *p) { +static void pollable_destroy(pollable *p) { po_destroy(&p->po); if (p->epfd != -1) { close(p->epfd); @@ -466,7 +466,7 @@ static void pollable_destroy(pollable_t *p) { } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p) { +static grpc_error *pollable_materialize(pollable *p) { if (p->epfd == -1) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { @@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable_t *p) { } /* pollable must be materialized */ -static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) { +static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; @@ -557,31 +557,34 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable != &pollset->pollable) { - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker->pollable_obj->po.mu); } if (worker->initialized_cv && worker != pollset->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } - append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), + append_error(&error, + grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), "pollset_shutdown"); } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); } worker = worker->links[PWL_POLLSET].next; @@ -589,7 +592,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, } pollset->kick_alls_pending--; pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("kick_all", error); } @@ -600,8 +603,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_ERROR_NONE); } -static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, pollable_t *p, +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, @@ -666,25 +668,25 @@ static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, /* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable_t *p = pollset->current_pollable; + pollable *p = pollset->current_pollable_obj; GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable) { + if (p != &pollset->pollable_obj) { gpr_mu_lock(&p->po.mu); } - grpc_error *error = pollset_kick_inner(exec_ctx, pollset, p, specific_worker); - if (p != &pollset->pollable) { + grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); + if (p != &pollset->pollable_obj) { gpr_mu_unlock(&p->po.mu); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable, PO_POLLSET); - pollset->current_pollable = &g_empty_pollable; + pollable_init(&pollset->pollable_obj, PO_POLLSET); + pollset->current_pollable_obj = &g_empty_pollable; pollset->kicked_without_poller = false; pollset->shutdown_closure = NULL; pollset->root_worker = NULL; - *mu = &pollset->pollable.po.mu; + *mu = &pollset->pollable_obj.po.mu; } /* Convert a timespec to milliseconds: @@ -732,8 +734,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); + if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { + append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); } return error; } @@ -747,8 +749,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) { - return p != &g_empty_pollable && p != &pollset->pollable; +static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { + return p != &g_empty_pollable && p != &pollset->pollable_obj; } static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, @@ -794,9 +796,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + pollable_destroy(&pollset->pollable_obj); + if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, "pollset_pollable"); } GRPC_LOG_IF_ERROR("pollset_process_events", @@ -804,7 +806,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, gpr_timespec now, + pollable *p, gpr_timespec now, gpr_timespec deadline) { int timeout = poll_deadline_to_millis_timeout(deadline, now); @@ -886,68 +888,69 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable = pollset->current_pollable; + worker->pollable_obj = pollset->current_pollable_obj; - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - REF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); } worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, + worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&pollset->pollable.po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&pollset->pollable_obj.po.mu); } if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, - worker->pollable, worker, + worker->pollable_obj, worker, poll_deadline_to_millis_timeout(deadline, *now)); } - while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + while (do_poll && worker->pollable_obj->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, - worker); + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, + worker->pollable_obj, worker); } do_poll = false; } else if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); - gpr_mu_lock(&pollset->pollable.po.mu); - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&worker->pollable_obj->po.mu); } *now = gpr_now(now->clock_type); } return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable == worker->pollable; + pollset->current_pollable_obj == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (NEW_ROOT == - worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable->root_worker->cv); + worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { + gpr_cv_signal(&worker->pollable_obj->root_worker->cv); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); } if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -975,41 +978,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (pollset->current_pollable != &pollset->pollable) { - gpr_mu_lock(&pollset->current_pollable->po.mu); + if (pollset->current_pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable), err_desc); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, now, deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - gpr_mu_lock(&pollset->pollable.po.mu); - if (worker.pollable != &pollset->pollable) { - gpr_mu_lock(&worker.pollable->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker.pollable_obj->po.mu); } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); } return error; } @@ -1026,27 +1029,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable == &g_empty_pollable) { + if (pollset->current_pollable_obj == &g_empty_pollable) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); } - /* empty pollable --> single fd pollable_t */ + /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &fd->pollable; - if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + pollset->current_pollable_obj = &fd->pollable_obj; + if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); + if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable == &pollset->pollable) { + } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); } - append_error(&error, pollable_add_fd(pollset->current_pollable, fd), + append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), err_desc); - } else if (pollset->current_pollable != &fd->pollable) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; + } else if (pollset->current_pollable_obj != &fd->pollable_obj) { + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", @@ -1058,11 +1061,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &pollset->pollable; - if (append_error(&error, pollable_materialize(&pollset->pollable), + pollset->current_pollable_obj = &pollset->pollable_obj; + if (append_error(&error, pollable_materialize(&pollset->pollable_obj), err_desc)) { - pollable_add_fd(&pollset->pollable, had_fd); - pollable_add_fd(&pollset->pollable, fd); + pollable_add_fd(&pollset->pollable_obj, had_fd); + pollable_add_fd(&pollset->pollable_obj, fd); } GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, @@ -1074,9 +1077,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1098,7 +1101,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable.po); + po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, @@ -1106,7 +1109,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable.po); + po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, |