diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-18 08:57:19 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-18 08:57:19 -0700 |
commit | adbcec4f57f7e25859326a6daf8ad7e08f2805ca (patch) | |
tree | c029f503127946f0564134d343c8af9f88ac8da5 /src/core/lib/iomgr | |
parent | 7f10c299c7eb5d6688c20ae43c63b0548ca8acd1 (diff) | |
parent | a5f46e29bcee3068f5ec6691b7e06cf944430881 (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 54 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 203 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.c | 14 | ||||
-rw-r--r-- | src/core/lib/iomgr/wakeup_fd_cv.h | 4 |
5 files changed, 142 insertions, 139 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index aa861d3cc1..7058acca50 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -130,9 +130,9 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t; +typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; -static const char *kick_state_string(kick_state_t st) { +static const char *kick_state_string(kick_state st) { switch (st) { case UNKICKED: return "UNKICKED"; @@ -145,7 +145,7 @@ static const char *kick_state_string(kick_state_t st) { } struct grpc_pollset_worker { - kick_state_t kick_state; + kick_state state; int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; @@ -154,9 +154,9 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; -#define SET_KICK_STATE(worker, state) \ +#define SET_KICK_STATE(worker, kick_state) \ do { \ - (worker)->kick_state = (state); \ + (worker)->state = (kick_state); \ (worker)->kick_state_mutator = __LINE__; \ } while (false) @@ -510,7 +510,7 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset_worker *worker = pollset->root_worker; do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - switch (worker->kick_state) { + switch (worker->state) { case KICKED: GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); break; @@ -695,7 +695,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_mu_lock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), is_reassigning); } if (pollset->seen_inactive) { @@ -715,12 +715,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, at this point is if it were "kicked specifically". Since the worker has not added itself to the pollset yet (by calling worker_insert()), it is not visible in the "kick any" path yet */ - if (worker->kick_state == UNKICKED) { + if (worker->state == UNKICKED) { pollset->seen_inactive = false; if (neighborhood->active_root == NULL) { neighborhood->active_root = pollset->next = pollset->prev = pollset; /* Make this the designated poller if there isn't one already */ - if (worker->kick_state == UNKICKED && + if (worker->state == UNKICKED && gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { SET_KICK_STATE(worker, DESIGNATED_POLLER); } @@ -740,19 +740,19 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker_insert(pollset, worker); pollset->begin_refs--; - if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) { + if (worker->state == UNKICKED && !pollset->kicked_without_poller) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + while (worker->state == UNKICKED && !pollset->shutting_down) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down); } if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && - worker->kick_state == UNKICKED) { + worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); @@ -765,7 +765,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " "kicked_without_poller: %d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down, pollset->kicked_without_poller); } @@ -785,7 +785,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } GPR_TIMER_END("begin_worker", 0); - return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; + return worker->state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighborhood_for_available_poller( @@ -802,7 +802,7 @@ static bool check_neighborhood_for_available_poller( grpc_pollset_worker *inspect_worker = inspect->root_worker; if (inspect_worker != NULL) { do { - switch (inspect_worker->kick_state) { + switch (inspect_worker->state) { case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { @@ -866,7 +866,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { - if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); } @@ -1005,14 +1005,14 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_strvec_add(&log, tmp); if (pollset->root_worker != NULL) { gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", - kick_state_string(pollset->root_worker->kick_state), + kick_state_string(pollset->root_worker->state), pollset->root_worker->next, - kick_state_string(pollset->root_worker->next->kick_state)); + kick_state_string(pollset->root_worker->next->state)); gpr_strvec_add(&log, tmp); } if (specific_worker != NULL) { gpr_asprintf(&tmp, " worker_kick_state=%s", - kick_state_string(specific_worker->kick_state)); + kick_state_string(specific_worker->state)); gpr_strvec_add(&log, tmp); } tmp = gpr_strvec_flatten(&log, NULL); @@ -1033,14 +1033,14 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker->kick_state == KICKED) { + if (root_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); goto done; - } else if (next_worker->kick_state == KICKED) { + } else if (next_worker->state == KICKED) { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); @@ -1059,7 +1059,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, SET_KICK_STATE(root_worker, KICKED); ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); goto done; - } else if (next_worker->kick_state == UNKICKED) { + } else if (next_worker->state == UNKICKED) { GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); @@ -1068,8 +1068,8 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); goto done; - } else if (next_worker->kick_state == DESIGNATED_POLLER) { - if (root_worker->kick_state != DESIGNATED_POLLER) { + } else if (next_worker->state == DESIGNATED_POLLER) { + if (root_worker->state != DESIGNATED_POLLER) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log( GPR_ERROR, @@ -1094,7 +1094,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); - GPR_ASSERT(next_worker->kick_state == KICKED); + GPR_ASSERT(next_worker->state == KICKED); SET_KICK_STATE(next_worker, KICKED); goto done; } @@ -1109,7 +1109,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_UNREACHABLE_CODE(goto done); } - if (specific_worker->kick_state == KICKED) { + if (specific_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 4e5bed5016..2a276ca360 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, * pollable Declarations */ -typedef struct pollable_t { +typedef struct pollable { polling_obj po; int epfd; grpc_wakeup_fd wakeup; grpc_pollset_worker *root_worker; -} pollable_t; +} pollable; static const char *polling_obj_type_string(polling_obj_type t) { switch (t) { @@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) { return "<invalid>"; } -static char *pollable_desc(pollable_t *p) { +static char *pollable_desc(pollable *p) { char *out; gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", polling_obj_type_string(p->po.type), p->po.group, p->epfd, @@ -130,19 +130,19 @@ static char *pollable_desc(pollable_t *p) { return out; } -static pollable_t g_empty_pollable; +static pollable g_empty_pollable; -static void pollable_init(pollable_t *p, polling_obj_type type); -static void pollable_destroy(pollable_t *p); +static void pollable_init(pollable *p, polling_obj_type type); +static void pollable_destroy(pollable *p); /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p); +static grpc_error *pollable_materialize(pollable *p); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable_t pollable; + pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -193,15 +193,15 @@ struct grpc_pollset_worker { pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; gpr_cv cv; grpc_pollset *pollset; - pollable_t *pollable; + pollable *pollable_obj; }; #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 struct grpc_pollset { - pollable_t pollable; - pollable_t *current_pollable; + pollable pollable_obj; + pollable *current_pollable_obj; int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; @@ -282,7 +282,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); + pollable_destroy(&fd->pollable_obj); gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -343,7 +343,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable, PO_FD); + pollable_init(&new_fd->pollable_obj, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -385,7 +385,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->pollable_obj.po.mu); gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; @@ -411,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable_t *p, polling_obj_type type) { +static void pollable_init(pollable *p, polling_obj_type type) { po_init(&p->po, type); p->root_worker = NULL; p->epfd = -1; } -static void pollable_destroy(pollable_t *p) { +static void pollable_destroy(pollable *p) { po_destroy(&p->po); if (p->epfd != -1) { close(p->epfd); @@ -466,7 +466,7 @@ static void pollable_destroy(pollable_t *p) { } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p) { +static grpc_error *pollable_materialize(pollable *p) { if (p->epfd == -1) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { @@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable_t *p) { } /* pollable must be materialized */ -static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) { +static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; @@ -557,31 +557,34 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable != &pollset->pollable) { - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker->pollable_obj->po.mu); } if (worker->initialized_cv && worker != pollset->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } - append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), + append_error(&error, + grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), "pollset_shutdown"); } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); } worker = worker->links[PWL_POLLSET].next; @@ -589,7 +592,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, } pollset->kick_alls_pending--; pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("kick_all", error); } @@ -600,8 +603,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_ERROR_NONE); } -static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, pollable_t *p, +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, @@ -666,25 +668,25 @@ static grpc_error *pollset_kick_inner(grpc_exec_ctx *exec_ctx, /* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable_t *p = pollset->current_pollable; + pollable *p = pollset->current_pollable_obj; GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable) { + if (p != &pollset->pollable_obj) { gpr_mu_lock(&p->po.mu); } - grpc_error *error = pollset_kick_inner(exec_ctx, pollset, p, specific_worker); - if (p != &pollset->pollable) { + grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); + if (p != &pollset->pollable_obj) { gpr_mu_unlock(&p->po.mu); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable, PO_POLLSET); - pollset->current_pollable = &g_empty_pollable; + pollable_init(&pollset->pollable_obj, PO_POLLSET); + pollset->current_pollable_obj = &g_empty_pollable; pollset->kicked_without_poller = false; pollset->shutdown_closure = NULL; pollset->root_worker = NULL; - *mu = &pollset->pollable.po.mu; + *mu = &pollset->pollable_obj.po.mu; } /* Convert a timespec to milliseconds: @@ -732,8 +734,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); + if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { + append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); } return error; } @@ -747,8 +749,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) { - return p != &g_empty_pollable && p != &pollset->pollable; +static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { + return p != &g_empty_pollable && p != &pollset->pollable_obj; } static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, @@ -794,9 +796,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + pollable_destroy(&pollset->pollable_obj); + if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, "pollset_pollable"); } GRPC_LOG_IF_ERROR("pollset_process_events", @@ -804,7 +806,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, gpr_timespec now, + pollable *p, gpr_timespec now, gpr_timespec deadline) { int timeout = poll_deadline_to_millis_timeout(deadline, now); @@ -886,68 +888,69 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable = pollset->current_pollable; + worker->pollable_obj = pollset->current_pollable_obj; - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - REF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); } worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, + worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&pollset->pollable.po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&pollset->pollable_obj.po.mu); } if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, - worker->pollable, worker, + worker->pollable_obj, worker, poll_deadline_to_millis_timeout(deadline, *now)); } - while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + while (do_poll && worker->pollable_obj->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, - worker); + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, + worker->pollable_obj, worker); } do_poll = false; } else if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); - gpr_mu_lock(&pollset->pollable.po.mu); - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&worker->pollable_obj->po.mu); } *now = gpr_now(now->clock_type); } return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable == worker->pollable; + pollset->current_pollable_obj == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (NEW_ROOT == - worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable->root_worker->cv); + worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { + gpr_cv_signal(&worker->pollable_obj->root_worker->cv); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); } if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -975,41 +978,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (pollset->current_pollable != &pollset->pollable) { - gpr_mu_lock(&pollset->current_pollable->po.mu); + if (pollset->current_pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable), err_desc); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, now, deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - gpr_mu_lock(&pollset->pollable.po.mu); - if (worker.pollable != &pollset->pollable) { - gpr_mu_lock(&worker.pollable->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker.pollable_obj->po.mu); } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); } return error; } @@ -1026,27 +1029,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable == &g_empty_pollable) { + if (pollset->current_pollable_obj == &g_empty_pollable) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); } - /* empty pollable --> single fd pollable_t */ + /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &fd->pollable; - if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + pollset->current_pollable_obj = &fd->pollable_obj; + if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); + if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable == &pollset->pollable) { + } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); } - append_error(&error, pollable_add_fd(pollset->current_pollable, fd), + append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), err_desc); - } else if (pollset->current_pollable != &fd->pollable) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; + } else if (pollset->current_pollable_obj != &fd->pollable_obj) { + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", @@ -1058,11 +1061,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &pollset->pollable; - if (append_error(&error, pollable_materialize(&pollset->pollable), + pollset->current_pollable_obj = &pollset->pollable_obj; + if (append_error(&error, pollable_materialize(&pollset->pollable_obj), err_desc)) { - pollable_add_fd(&pollset->pollable, had_fd); - pollable_add_fd(&pollset->pollable, fd); + pollable_add_fd(&pollset->pollable_obj, had_fd); + pollable_add_fd(&pollset->pollable_obj, fd); } GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, @@ -1074,9 +1077,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1098,7 +1101,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable.po); + po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, @@ -1106,7 +1109,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable.po); + po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 15968d23ba..b8482b9d3a 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1543,7 +1543,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - idx = FD_TO_IDX(fds[i].fd); + idx = GRPC_FD_TO_IDX(fds[i].fd); fd_cvs[i].cv = &pollcv_cv; fd_cvs[i].prev = NULL; fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; @@ -1606,8 +1606,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); - if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); + if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index 5e0b1d1704..268e0175dd 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -57,7 +57,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { g_cvfds.free_fds = g_cvfds.free_fds->next_free; g_cvfds.cvfds[idx].cvs = NULL; g_cvfds.cvfds[idx].is_set = 0; - fd_info->read_fd = IDX_TO_FD(idx); + fd_info->read_fd = GRPC_IDX_TO_FD(idx); fd_info->write_fd = -1; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; @@ -66,8 +66,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1; - cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; + cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; while (cvn) { gpr_cv_signal(cvn->cv); cvn = cvn->next; @@ -78,7 +78,7 @@ static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 0; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } @@ -89,9 +89,9 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) { } gpr_mu_lock(&g_cvfds.mu); // Assert that there are no active pollers - GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; - g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)]; + GPR_ASSERT(!g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs); + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)]; gpr_mu_unlock(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 46e84f5843..dc170ad5b4 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -37,8 +37,8 @@ #include "src/core/lib/iomgr/ev_posix.h" -#define FD_TO_IDX(fd) (-(fd)-1) -#define IDX_TO_FD(idx) (-(idx)-1) +#define GRPC_FD_TO_IDX(fd) (-(fd)-1) +#define GRPC_IDX_TO_FD(idx) (-(idx)-1) typedef struct cv_node { gpr_cv* cv; |