diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll1_linux.c | 54 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 200 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 66 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/channel.c | 2 |
5 files changed, 164 insertions, 160 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 6946a2cbf5..0d57833d85 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -130,9 +130,9 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t; +typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; -static const char *kick_state_string(kick_state_t st) { +static const char *kick_state_string(kick_state st) { switch (st) { case UNKICKED: return "UNKICKED"; @@ -145,7 +145,7 @@ static const char *kick_state_string(kick_state_t st) { } struct grpc_pollset_worker { - kick_state_t kick_state; + kick_state state; int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; @@ -154,9 +154,9 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; -#define SET_KICK_STATE(worker, state) \ +#define SET_KICK_STATE(worker, kick_state) \ do { \ - (worker)->kick_state = (state); \ + (worker)->state = (kick_state); \ (worker)->kick_state_mutator = __LINE__; \ } while (false) @@ -508,7 +508,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - switch (worker->kick_state) { + switch (worker->state) { case KICKED: break; case UNKICKED: @@ -688,7 +688,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_mu_lock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), is_reassigning); } if (pollset->seen_inactive) { @@ -708,12 +708,12 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, at this point is if it were "kicked specifically". Since the worker has not added itself to the pollset yet (by calling worker_insert()), it is not visible in the "kick any" path yet */ - if (worker->kick_state == UNKICKED) { + if (worker->state == UNKICKED) { pollset->seen_inactive = false; if (neighborhood->active_root == NULL) { neighborhood->active_root = pollset->next = pollset->prev = pollset; /* Make this the designated poller if there isn't one already */ - if (worker->kick_state == UNKICKED && + if (worker->state == UNKICKED && gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { SET_KICK_STATE(worker, DESIGNATED_POLLER); } @@ -733,19 +733,19 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker_insert(pollset, worker); pollset->begin_refs--; - if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) { + if (worker->state == UNKICKED && !pollset->kicked_without_poller) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + while (worker->state == UNKICKED && !pollset->shutting_down) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down); } if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) && - worker->kick_state == UNKICKED) { + worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); @@ -758,7 +758,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " "kicked_without_poller: %d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down, pollset->kicked_without_poller); } @@ -778,7 +778,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, } GPR_TIMER_END("begin_worker", 0); - return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; + return worker->state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighborhood_for_available_poller( @@ -795,7 +795,7 @@ static bool check_neighborhood_for_available_poller( grpc_pollset_worker *inspect_worker = inspect->root_worker; if (inspect_worker != NULL) { do { - switch (inspect_worker->kick_state) { + switch (inspect_worker->state) { case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { @@ -858,7 +858,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { - if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); } @@ -993,14 +993,14 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, gpr_strvec_add(&log, tmp); if (pollset->root_worker != NULL) { gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", - kick_state_string(pollset->root_worker->kick_state), + kick_state_string(pollset->root_worker->state), pollset->root_worker->next, - kick_state_string(pollset->root_worker->next->kick_state)); + kick_state_string(pollset->root_worker->next->state)); gpr_strvec_add(&log, tmp); } if (specific_worker != NULL) { gpr_asprintf(&tmp, " worker_kick_state=%s", - kick_state_string(specific_worker->kick_state)); + kick_state_string(specific_worker->state)); gpr_strvec_add(&log, tmp); } tmp = gpr_strvec_flatten(&log, NULL); @@ -1020,13 +1020,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, goto done; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker->kick_state == KICKED) { + if (root_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); goto done; - } else if (next_worker->kick_state == KICKED) { + } else if (next_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); } @@ -1043,7 +1043,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, SET_KICK_STATE(root_worker, KICKED); ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); goto done; - } else if (next_worker->kick_state == UNKICKED) { + } else if (next_worker->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); } @@ -1051,8 +1051,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); goto done; - } else if (next_worker->kick_state == DESIGNATED_POLLER) { - if (root_worker->kick_state != DESIGNATED_POLLER) { + } else if (next_worker->state == DESIGNATED_POLLER) { + if (root_worker->state != DESIGNATED_POLLER) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log( GPR_ERROR, @@ -1074,7 +1074,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, goto done; } } else { - GPR_ASSERT(next_worker->kick_state == KICKED); + GPR_ASSERT(next_worker->state == KICKED); SET_KICK_STATE(next_worker, KICKED); goto done; } @@ -1088,7 +1088,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, GPR_UNREACHABLE_CODE(goto done); } - if (specific_worker->kick_state == KICKED) { + if (specific_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index df69025f1a..51200fc064 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -97,12 +97,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, * pollable Declarations */ -typedef struct pollable_t { +typedef struct pollable { polling_obj po; int epfd; grpc_wakeup_fd wakeup; grpc_pollset_worker *root_worker; -} pollable_t; +} pollable; static const char *polling_obj_type_string(polling_obj_type t) { switch (t) { @@ -122,7 +122,7 @@ static const char *polling_obj_type_string(polling_obj_type t) { return "<invalid>"; } -static char *pollable_desc(pollable_t *p) { +static char *pollable_desc(pollable *p) { char *out; gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", polling_obj_type_string(p->po.type), p->po.group, p->epfd, @@ -130,19 +130,19 @@ static char *pollable_desc(pollable_t *p) { return out; } -static pollable_t g_empty_pollable; +static pollable g_empty_pollable; -static void pollable_init(pollable_t *p, polling_obj_type type); -static void pollable_destroy(pollable_t *p); +static void pollable_init(pollable *p, polling_obj_type type); +static void pollable_destroy(pollable *p); /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p); +static grpc_error *pollable_materialize(pollable *p); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable_t pollable; + pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -193,15 +193,15 @@ struct grpc_pollset_worker { pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; gpr_cv cv; grpc_pollset *pollset; - pollable_t *pollable; + pollable *pollable_obj; }; #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 struct grpc_pollset { - pollable_t pollable; - pollable_t *current_pollable; + pollable pollable_obj; + pollable *current_pollable_obj; int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; @@ -282,7 +282,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); + pollable_destroy(&fd->pollable_obj); gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -343,7 +343,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable, PO_FD); + pollable_init(&new_fd->pollable_obj, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -385,7 +385,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->pollable_obj.po.mu); gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; @@ -411,7 +411,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -451,13 +451,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable_t *p, polling_obj_type type) { +static void pollable_init(pollable *p, polling_obj_type type) { po_init(&p->po, type); p->root_worker = NULL; p->epfd = -1; } -static void pollable_destroy(pollable_t *p) { +static void pollable_destroy(pollable *p) { po_destroy(&p->po); if (p->epfd != -1) { close(p->epfd); @@ -466,7 +466,7 @@ static void pollable_destroy(pollable_t *p) { } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p) { +static grpc_error *pollable_materialize(pollable *p) { if (p->epfd == -1) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { @@ -492,7 +492,7 @@ static grpc_error *pollable_materialize(pollable_t *p) { } /* pollable must be materialized */ -static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) { +static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; @@ -557,30 +557,33 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - if (worker->pollable != &pollset->pollable) { - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker->pollable_obj->po.mu); } if (worker->initialized_cv && worker != pollset->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } - append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), + append_error(&error, + grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), "pollset_shutdown"); } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); } worker = worker->links[PWL_POLLSET].next; @@ -588,7 +591,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, } pollset->kick_alls_pending--; pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("kick_all", error); } @@ -599,7 +602,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_ERROR_NONE); } -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p, +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, @@ -664,24 +667,24 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p, /* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable_t *p = pollset->current_pollable; - if (p != &pollset->pollable) { + pollable *p = pollset->current_pollable_obj; + if (p != &pollset->pollable_obj) { gpr_mu_lock(&p->po.mu); } grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable) { + if (p != &pollset->pollable_obj) { gpr_mu_unlock(&p->po.mu); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable, PO_POLLSET); - pollset->current_pollable = &g_empty_pollable; + pollable_init(&pollset->pollable_obj, PO_POLLSET); + pollset->current_pollable_obj = &g_empty_pollable; pollset->kicked_without_poller = false; pollset->shutdown_closure = NULL; pollset->root_worker = NULL; - *mu = &pollset->pollable.po.mu; + *mu = &pollset->pollable_obj.po.mu; } /* Convert a timespec to milliseconds: @@ -729,8 +732,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); + if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { + append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); } return error; } @@ -744,8 +747,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) { - return p != &g_empty_pollable && p != &pollset->pollable; +static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { + return p != &g_empty_pollable && p != &pollset->pollable_obj; } static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, @@ -791,9 +794,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + pollable_destroy(&pollset->pollable_obj); + if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, "pollset_pollable"); } GRPC_LOG_IF_ERROR("pollset_process_events", @@ -801,7 +804,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, gpr_timespec now, + pollable *p, gpr_timespec now, gpr_timespec deadline) { int timeout = poll_deadline_to_millis_timeout(deadline, now); @@ -883,68 +886,69 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable = pollset->current_pollable; + worker->pollable_obj = pollset->current_pollable_obj; - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - REF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); } worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, + worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&pollset->pollable.po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&pollset->pollable_obj.po.mu); } if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, - worker->pollable, worker, + worker->pollable_obj, worker, poll_deadline_to_millis_timeout(deadline, *now)); } - while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, deadline)) { + while (do_poll && worker->pollable_obj->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, deadline)) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, - worker); + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, + worker->pollable_obj, worker); } do_poll = false; } else if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); - gpr_mu_lock(&pollset->pollable.po.mu); - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&worker->pollable_obj->po.mu); } *now = gpr_now(now->clock_type); } return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable == worker->pollable; + pollset->current_pollable_obj == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (NEW_ROOT == - worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable->root_worker->cv); + worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { + gpr_cv_signal(&worker->pollable_obj->root_worker->cv); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); } if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -972,41 +976,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (pollset->current_pollable != &pollset->pollable) { - gpr_mu_lock(&pollset->current_pollable->po.mu); + if (pollset->current_pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable), err_desc); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable, + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, now, deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - gpr_mu_lock(&pollset->pollable.po.mu); - if (worker.pollable != &pollset->pollable) { - gpr_mu_lock(&worker.pollable->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker.pollable_obj->po.mu); } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); } return error; } @@ -1023,27 +1027,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable == &g_empty_pollable) { + if (pollset->current_pollable_obj == &g_empty_pollable) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); } - /* empty pollable --> single fd pollable_t */ + /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &fd->pollable; - if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + pollset->current_pollable_obj = &fd->pollable_obj; + if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); + if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable == &pollset->pollable) { + } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); } - append_error(&error, pollable_add_fd(pollset->current_pollable, fd), + append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), err_desc); - } else if (pollset->current_pollable != &fd->pollable) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; + } else if (pollset->current_pollable_obj != &fd->pollable_obj) { + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", @@ -1055,11 +1059,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &pollset->pollable; - if (append_error(&error, pollable_materialize(&pollset->pollable), + pollset->current_pollable_obj = &pollset->pollable_obj; + if (append_error(&error, pollable_materialize(&pollset->pollable_obj), err_desc)) { - pollable_add_fd(&pollset->pollable, had_fd); - pollable_add_fd(&pollset->pollable, fd); + pollable_add_fd(&pollset->pollable_obj, had_fd); + pollable_add_fd(&pollset->pollable_obj, fd); } GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, @@ -1071,9 +1075,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1095,7 +1099,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable.po); + po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, @@ -1103,7 +1107,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable.po); + po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index de5aa7b6be..e258719c4d 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -135,7 +135,7 @@ typedef struct batch_control { typedef struct { gpr_mu child_list_mu; grpc_call *first_child; -} parent_call_t; +} parent_call; typedef struct { grpc_call *parent; @@ -144,7 +144,7 @@ typedef struct { parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; -} child_call_t; +} child_call; #define RECV_NONE ((gpr_atm)0) #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) @@ -157,8 +157,8 @@ struct grpc_call { grpc_polling_entity pollent; grpc_channel *channel; gpr_timespec start_time; - /* parent_call_t* */ gpr_atm parent_call_atm; - child_call_t *child_call; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child; /* client or server call */ bool is_client; @@ -304,21 +304,21 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) { return gpr_arena_alloc(call->arena, size); } -static parent_call_t *get_or_create_parent_call(grpc_call *call) { - parent_call_t *p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { - p = (parent_call_t *)gpr_arena_alloc(call->arena, sizeof(*p)); + p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p)); gpr_mu_init(&p->child_list_mu); if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { gpr_mu_destroy(&p->child_list_mu); - p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } } return p; } -static parent_call_t *get_parent_call(grpc_call *call) { - return (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, @@ -377,24 +377,24 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, bool immediately_cancel = false; - if (args->parent_call != NULL) { - child_call_t *cc = call->child_call = - (child_call_t *)gpr_arena_alloc(arena, sizeof(child_call_t)); - call->child_call->parent = args->parent_call; + if (args->parent != NULL) { + child_call *cc = call->child = + (child_call *)gpr_arena_alloc(arena, sizeof(child_call)); + call->child->parent = args->parent; - GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); + GRPC_CALL_INTERNAL_REF(args->parent, "child"); GPR_ASSERT(call->is_client); - GPR_ASSERT(!args->parent_call->is_client); + GPR_ASSERT(!args->parent->is_client); - parent_call_t *pc = get_or_create_parent_call(args->parent_call); + parent_call *pc = get_or_create_parent_call(args->parent); gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( gpr_convert_clock_type(send_deadline, - args->parent_call->send_deadline.clock_type), - args->parent_call->send_deadline); + args->parent->send_deadline.clock_type), + args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -406,9 +406,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, "Census tracing propagation requested " "without Census context propagation")); } - grpc_call_context_set( - call, GRPC_CONTEXT_TRACING, - args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + args->parent->context[GRPC_CONTEXT_TRACING].value, + NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Census context propagation requested " @@ -416,7 +416,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; - if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) { immediately_cancel = true; } } @@ -426,9 +426,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cc->sibling_next = cc->sibling_prev = call; } else { cc->sibling_next = pc->first_child; - cc->sibling_prev = pc->first_child->child_call->sibling_prev; - cc->sibling_next->child_call->sibling_prev = - cc->sibling_prev->child_call->sibling_next = call; + cc->sibling_prev = pc->first_child->child->sibling_prev; + cc->sibling_next->child->sibling_prev = + cc->sibling_prev->child->sibling_next = call; } gpr_mu_unlock(&pc->child_list_mu); @@ -533,7 +533,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - parent_call_t *pc = get_parent_call(c); + parent_call *pc = get_parent_call(c); if (pc != NULL) { gpr_mu_destroy(&pc->child_list_mu); } @@ -570,14 +570,14 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; - child_call_t *cc = c->child_call; + child_call *cc = c->child; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); if (cc) { - parent_call_t *pc = get_parent_call(cc->parent); + parent_call *pc = get_parent_call(cc->parent); gpr_mu_lock(&pc->child_list_mu); if (c == pc->first_child) { pc->first_child = cc->sibling_next; @@ -585,8 +585,8 @@ void grpc_call_unref(grpc_call *c) { pc->first_child = NULL; } } - cc->sibling_prev->child_call->sibling_next = cc->sibling_next; - cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + cc->sibling_prev->child->sibling_next = cc->sibling_next; + cc->sibling_next->child->sibling_prev = cc->sibling_prev; gpr_mu_unlock(&pc->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } @@ -1309,14 +1309,14 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - parent_call_t *pc = get_parent_call(call); + parent_call *pc = get_parent_call(call); if (pc != NULL) { grpc_call *child; gpr_mu_lock(&pc->child_list_mu); child = pc->first_child; if (child != NULL) { do { - next_child_call = child->child_call->sibling_next; + next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index d537637cbb..c680139cf6 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, typedef struct grpc_call_create_args { grpc_channel *channel; - grpc_call *parent_call; + grpc_call *parent; uint32_t propagation_mask; grpc_completion_queue *cq; diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 2f9b677c85..48962e5e45 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -282,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_call_create_args args; memset(&args, 0, sizeof(args)); args.channel = channel; - args.parent_call = parent_call; + args.parent = parent_call; args.propagation_mask = propagation_mask; args.cq = cq; args.pollset_set_alternative = pollset_set_alternative; |