diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_poll_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.cc | 199 |
1 files changed, 88 insertions, 111 deletions
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index e32e1ba42a..006e3ddd2f 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -128,8 +128,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, MUST NOT be called with a pollset lock taken if got_read or got_write are 1, also does the become_{readable,writable} as appropriate. */ -static void fd_end_poll(grpc_exec_ctx* exec_ctx, grpc_fd_watcher* rec, - int got_read, int got_write, +static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write, grpc_pollset* read_notifier_pollset); /* Return 1 if this fd is orphaned, 0 otherwise */ @@ -186,11 +185,9 @@ struct grpc_pollset { }; /* Add an fd to a pollset */ -static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - struct grpc_fd* fd); +static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd); -static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, grpc_fd* fd); +static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd); /* Convert a timespec to milliseconds: - very small or negative poll times are clamped to zero to do a @@ -199,8 +196,7 @@ static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ -static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx, - grpc_millis deadline); +static int poll_deadline_to_millis_timeout(grpc_millis deadline); /* Allow kick to wakeup the currently polling worker */ #define GRPC_POLLSET_CAN_KICK_SELF 1 @@ -208,7 +204,7 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static grpc_error* pollset_kick_ext(grpc_exec_ctx* exec_ctx, grpc_pollset* p, +static grpc_error* pollset_kick_ext(grpc_pollset* p, grpc_pollset_worker* specific_worker, uint32_t flags) GRPC_MUST_USE_RESULT; @@ -353,8 +349,7 @@ static bool fd_is_orphaned(grpc_fd* fd) { } /* Return the read-notifier pollset */ -static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx, - grpc_fd* fd) { +static grpc_pollset* fd_get_read_notifier_pollset(grpc_fd* fd) { grpc_pollset* notifier = nullptr; gpr_mu_lock(&fd->mu); @@ -364,39 +359,36 @@ static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx, return notifier; } -static grpc_error* pollset_kick_locked(grpc_exec_ctx* exec_ctx, - grpc_fd_watcher* watcher) { +static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - grpc_error* err = - pollset_kick_ext(exec_ctx, watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); return err; } -static void maybe_wake_one_watcher_locked(grpc_exec_ctx* exec_ctx, - grpc_fd* fd) { +static void maybe_wake_one_watcher_locked(grpc_fd* fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - pollset_kick_locked(exec_ctx, fd->inactive_watcher_root.next); + pollset_kick_locked(fd->inactive_watcher_root.next); } else if (fd->read_watcher) { - pollset_kick_locked(exec_ctx, fd->read_watcher); + pollset_kick_locked(fd->read_watcher); } else if (fd->write_watcher) { - pollset_kick_locked(exec_ctx, fd->write_watcher); + pollset_kick_locked(fd->write_watcher); } } -static void wake_all_watchers_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { +static void wake_all_watchers_locked(grpc_fd* fd) { grpc_fd_watcher* watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - pollset_kick_locked(exec_ctx, watcher); + pollset_kick_locked(watcher); } if (fd->read_watcher) { - pollset_kick_locked(exec_ctx, fd->read_watcher); + pollset_kick_locked(fd->read_watcher); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - pollset_kick_locked(exec_ctx, fd->write_watcher); + pollset_kick_locked(fd->write_watcher); } } @@ -405,12 +397,12 @@ static int has_watchers(grpc_fd* fd) { fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } -static void close_fd_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { +static void close_fd_locked(grpc_fd* fd) { fd->closed = 1; if (!fd->released) { close(fd->fd); } - GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE); } static int fd_wrapped_fd(grpc_fd* fd) { @@ -421,8 +413,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { } } -static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd, - grpc_closure* on_done, int* release_fd, +static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, bool already_closed, const char* reason) { fd->on_done_closure = on_done; fd->released = release_fd != nullptr; @@ -435,9 +426,9 @@ static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd, gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { - close_fd_locked(exec_ctx, fd); + close_fd_locked(fd); } else { - wake_all_watchers_locked(exec_ctx, fd); + wake_all_watchers_locked(fd); } gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* drop the reference */ @@ -469,10 +460,10 @@ static grpc_error* fd_shutdown_error(grpc_fd* fd) { } } -static void notify_on_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd, - grpc_closure** st, grpc_closure* closure) { +static void notify_on_locked(grpc_fd* fd, grpc_closure** st, + grpc_closure* closure) { if (fd->shutdown) { - GRPC_CLOSURE_SCHED(exec_ctx, closure, + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ @@ -480,8 +471,8 @@ static void notify_on_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd)); - maybe_wake_one_watcher_locked(exec_ctx, fd); + GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd)); + maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, @@ -492,8 +483,7 @@ static void notify_on_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd, } /* returns 1 if state becomes not ready */ -static int set_ready_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd, - grpc_closure** st) { +static int set_ready_locked(grpc_fd* fd, grpc_closure** st) { if (*st == CLOSURE_READY) { /* duplicate ready ==> ignore */ return 0; @@ -503,18 +493,18 @@ static int set_ready_locked(grpc_exec_ctx* exec_ctx, grpc_fd* fd, return 0; } else { /* waiting ==> queue closure */ - GRPC_CLOSURE_SCHED(exec_ctx, *st, fd_shutdown_error(fd)); + GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } } static void set_read_notifier_pollset_locked( - grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_pollset* read_notifier_pollset) { + grpc_fd* fd, grpc_pollset* read_notifier_pollset) { fd->read_notifier_pollset = read_notifier_pollset; } -static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { +static void fd_shutdown(grpc_fd* fd, grpc_error* why) { gpr_mu_lock(&fd->mu); /* only shutdown once */ if (!fd->shutdown) { @@ -522,8 +512,8 @@ static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { fd->shutdown_error = why; /* signal read/write closed to OS so that future operations fail */ shutdown(fd->fd, SHUT_RDWR); - set_ready_locked(exec_ctx, fd, &fd->read_closure); - set_ready_locked(exec_ctx, fd, &fd->write_closure); + set_ready_locked(fd, &fd->read_closure); + set_ready_locked(fd, &fd->write_closure); } else { GRPC_ERROR_UNREF(why); } @@ -537,17 +527,15 @@ static bool fd_is_shutdown(grpc_fd* fd) { return r; } -static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, - grpc_closure* closure) { +static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) { gpr_mu_lock(&fd->mu); - notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); + notify_on_locked(fd, &fd->read_closure, closure); gpr_mu_unlock(&fd->mu); } -static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, - grpc_closure* closure) { +static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { gpr_mu_lock(&fd->mu); - notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); + notify_on_locked(fd, &fd->write_closure, closure); gpr_mu_unlock(&fd->mu); } @@ -602,8 +590,7 @@ static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, return mask; } -static void fd_end_poll(grpc_exec_ctx* exec_ctx, grpc_fd_watcher* watcher, - int got_read, int got_write, +static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write, grpc_pollset* read_notifier_pollset) { int was_polling = 0; int kick = 0; @@ -637,23 +624,23 @@ static void fd_end_poll(grpc_exec_ctx* exec_ctx, grpc_fd_watcher* watcher, watcher->prev->next = watcher->next; } if (got_read) { - if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) { + if (set_ready_locked(fd, &fd->read_closure)) { kick = 1; } if (read_notifier_pollset != nullptr) { - set_read_notifier_pollset_locked(exec_ctx, fd, read_notifier_pollset); + set_read_notifier_pollset_locked(fd, read_notifier_pollset); } } if (got_write) { - if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) { + if (set_ready_locked(fd, &fd->write_closure)) { kick = 1; } } if (kick) { - maybe_wake_one_watcher_locked(exec_ctx, fd); + maybe_wake_one_watcher_locked(fd); } if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { - close_fd_locked(exec_ctx, fd); + close_fd_locked(fd); } gpr_mu_unlock(&fd->mu); @@ -714,12 +701,12 @@ static void kick_append_error(grpc_error** composite, grpc_error* error) { *composite = grpc_error_add_child(*composite, error); } -static grpc_error* pollset_kick_ext(grpc_exec_ctx* exec_ctx, grpc_pollset* p, +static grpc_error* pollset_kick_ext(grpc_pollset* p, grpc_pollset_worker* specific_worker, uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); grpc_error* error = GRPC_ERROR_NONE; - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); + GRPC_STATS_INC_POLLSET_KICK(); /* pollset->mu already held */ if (specific_worker != nullptr) { @@ -785,9 +772,9 @@ static grpc_error* pollset_kick_ext(grpc_exec_ctx* exec_ctx, grpc_pollset* p, return error; } -static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* p, +static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* specific_worker) { - return pollset_kick_ext(exec_ctx, p, specific_worker, 0); + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ @@ -821,7 +808,7 @@ static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { pollset->pollset_set_count = 0; } -static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) { +static void pollset_destroy(grpc_pollset* pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); while (pollset->local_wakeup_cache) { @@ -834,8 +821,7 @@ static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) { gpr_mu_destroy(&pollset->mu); } -static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - grpc_fd* fd) { +static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) { gpr_mu_lock(&pollset->mu); size_t i; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ @@ -850,19 +836,19 @@ static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } pollset->fds[pollset->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); - pollset_kick(exec_ctx, pollset, nullptr); + pollset_kick(pollset, nullptr); exit: gpr_mu_unlock(&pollset->mu); } -static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) { +static void finish_shutdown(grpc_pollset* pollset) { GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs)); size_t i; for (i = 0; i < pollset->fd_count; i++) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE); } static void work_combine_error(grpc_error** composite, grpc_error* error) { @@ -873,7 +859,7 @@ static void work_combine_error(grpc_error** composite, grpc_error* error) { *composite = grpc_error_add_child(*composite, error); } -static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, +static grpc_error* pollset_work(grpc_pollset* pollset, grpc_pollset_worker** worker_hdl, grpc_millis deadline) { grpc_pollset_worker worker; @@ -912,7 +898,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, if (!pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { GPR_TIMER_MARK("pollset_work.idle_jobs", 0); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(&pollset->idle_jobs); goto done; } /* If we're shutting down then we don't execute any extended work */ @@ -944,7 +930,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd_watcher* watchers; struct pollfd* pfds; - timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); + timeout = poll_deadline_to_millis_timeout(deadline); if (pollset->fd_count + 2 <= inline_elements) { pfds = pollfd_space; @@ -988,9 +974,9 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid even going into the blocking annotation if possible */ GRPC_SCHEDULING_START_BLOCKING_REGION; - GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); + GRPC_STATS_INC_SYSCALL_POLL(); r = grpc_poll_function(pfds, pfd_count, timeout); - GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); + GRPC_SCHEDULING_END_BLOCKING_REGION; if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); @@ -1003,16 +989,16 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, for (i = 1; i < pfd_count; i++) { if (watchers[i].fd == nullptr) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0, nullptr); } else { // Wake up all the file descriptors, if we have an invalid one // we can identify it on the next pollset_work() - fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset); + fd_end_poll(&watchers[i], 1, 1, pollset); } } } else if (r == 0) { for (i = 1; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0, nullptr); } } else { if (pfds[0].revents & POLLIN_CHECK) { @@ -1024,14 +1010,14 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, } for (i = 1; i < pfd_count; i++) { if (watchers[i].fd == nullptr) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, nullptr); + fd_end_poll(&watchers[i], 0, 0, nullptr); } else { if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } - fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, + fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } } @@ -1054,7 +1040,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, worker list, which means nobody could ask us to re-evaluate polling). */ done: if (!locked) { - queued_work |= grpc_exec_ctx_flush(exec_ctx); + queued_work |= grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); locked = 1; } @@ -1083,21 +1069,21 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, /* check shutdown conditions */ if (pollset->shutting_down) { if (pollset_has_workers(pollset)) { - pollset_kick(exec_ctx, pollset, nullptr); + pollset_kick(pollset, nullptr); } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(exec_ctx, pollset); - grpc_exec_ctx_flush(exec_ctx); + finish_shutdown(pollset); + grpc_core::ExecCtx::Get()->Flush(); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * pollset_work. * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(&pollset->idle_jobs); gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_flush(exec_ctx); + grpc_core::ExecCtx::Get()->Flush(); gpr_mu_lock(&pollset->mu); } } @@ -1107,26 +1093,24 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, return error; } -static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, - grpc_closure* closure) { +static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; pollset->shutdown_done = closure; - pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST); + pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset_has_workers(pollset)) { - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(&pollset->idle_jobs); } if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; - finish_shutdown(exec_ctx, pollset); + finish_shutdown(pollset); } } -static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx, - grpc_millis deadline) { +static int poll_deadline_to_millis_timeout(grpc_millis deadline) { if (deadline == GRPC_MILLIS_INF_FUTURE) return -1; if (deadline == 0) return 0; - grpc_millis n = deadline - grpc_exec_ctx_now(exec_ctx); + grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now(); if (n < 0) return 0; if (n > INT_MAX) return -1; return (int)n; @@ -1143,8 +1127,7 @@ static grpc_pollset_set* pollset_set_create(void) { return pollset_set; } -static void pollset_set_destroy(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set) { +static void pollset_set_destroy(grpc_pollset_set* pollset_set) { size_t i; gpr_mu_destroy(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { @@ -1159,7 +1142,7 @@ static void pollset_set_destroy(grpc_exec_ctx* exec_ctx, !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(exec_ctx, pollset); + finish_shutdown(pollset); } else { gpr_mu_unlock(&pollset->mu); } @@ -1170,8 +1153,7 @@ static void pollset_set_destroy(grpc_exec_ctx* exec_ctx, gpr_free(pollset_set); } -static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, +static void pollset_set_add_pollset(grpc_pollset_set* pollset_set, grpc_pollset* pollset) { size_t i, j; gpr_mu_lock(&pollset->mu); @@ -1190,7 +1172,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, if (fd_is_orphaned(pollset_set->fds[i])) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); } else { - pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); + pollset_add_fd(pollset, pollset_set->fds[i]); pollset_set->fds[j++] = pollset_set->fds[i]; } } @@ -1198,8 +1180,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, gpr_mu_unlock(&pollset_set->mu); } -static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, +static void pollset_set_del_pollset(grpc_pollset_set* pollset_set, grpc_pollset* pollset) { size_t i; gpr_mu_lock(&pollset_set->mu); @@ -1219,14 +1200,13 @@ static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(exec_ctx, pollset); + finish_shutdown(pollset); } else { gpr_mu_unlock(&pollset->mu); } } -static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* bag, +static void pollset_set_add_pollset_set(grpc_pollset_set* bag, grpc_pollset_set* item) { size_t i, j; gpr_mu_lock(&bag->mu); @@ -1241,7 +1221,7 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, if (fd_is_orphaned(bag->fds[i])) { GRPC_FD_UNREF(bag->fds[i], "pollset_set"); } else { - pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + pollset_set_add_fd(item, bag->fds[i]); bag->fds[j++] = bag->fds[i]; } } @@ -1249,8 +1229,7 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, gpr_mu_unlock(&bag->mu); } -static void pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* bag, +static void pollset_set_del_pollset_set(grpc_pollset_set* bag, grpc_pollset_set* item) { size_t i; gpr_mu_lock(&bag->mu); @@ -1265,8 +1244,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, gpr_mu_unlock(&bag->mu); } -static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, grpc_fd* fd) { +static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { size_t i; gpr_mu_lock(&pollset_set->mu); if (pollset_set->fd_count == pollset_set->fd_capacity) { @@ -1277,16 +1255,15 @@ static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, GRPC_FD_REF(fd, "pollset_set"); pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { - pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); + pollset_add_fd(pollset_set->pollsets[i], fd); } for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + pollset_set_add_fd(pollset_set->pollset_sets[i], fd); } gpr_mu_unlock(&pollset_set->mu); } -static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, grpc_fd* fd) { +static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { size_t i; gpr_mu_lock(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { @@ -1299,7 +1276,7 @@ static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, } } for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + pollset_set_del_fd(pollset_set->pollset_sets[i], fd); } gpr_mu_unlock(&pollset_set->mu); } |