diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 402 |
1 files changed, 203 insertions, 199 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0809d574a9..aafdd690c7 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -86,21 +86,21 @@ struct pollable { grpc_wakeup_fd wakeup; // only for type fd... one ref to the owner fd - grpc_fd *owner_fd; + grpc_fd* owner_fd; - grpc_pollset_set *pollset_set; - pollable *next; - pollable *prev; + grpc_pollset_set* pollset_set; + pollable* next; + pollable* prev; gpr_mu mu; - grpc_pollset_worker *root_worker; + grpc_pollset_worker* root_worker; int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; }; -static const char *pollable_type_string(pollable_type t) { +static const char* pollable_type_string(pollable_type t) { switch (t) { case PO_MULTI: return "pollset"; @@ -112,8 +112,8 @@ static const char *pollable_type_string(pollable_type t) { return "<invalid>"; } -static char *pollable_desc(pollable *p) { - char *out; +static char* pollable_desc(pollable* p) { + char* out; gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type), p->epfd, p->wakeup.read_fd); return out; @@ -121,17 +121,17 @@ static char *pollable_desc(pollable *p) { /// Shared empty pollable - used by pollset to poll on until the first fd is /// added -static pollable *g_empty_pollable; +static pollable* g_empty_pollable; -static grpc_error *pollable_create(pollable_type type, pollable **p); +static grpc_error* pollable_create(pollable_type type, pollable** p); #ifdef NDEBUG -static pollable *pollable_ref(pollable *p); -static void pollable_unref(pollable *p); +static pollable* pollable_ref(pollable* p); +static void pollable_unref(pollable* p); #define POLLABLE_REF(p, r) pollable_ref(p) #define POLLABLE_UNREF(p, r) pollable_unref(p) #else -static pollable *pollable_ref(pollable *p, int line, const char *reason); -static void pollable_unref(pollable *p, int line, const char *reason); +static pollable* pollable_ref(pollable* p, int line, const char* reason); +static void pollable_unref(pollable* p, int line, const char* reason); #define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r)) #define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r)) #endif @@ -151,13 +151,13 @@ struct grpc_fd { gpr_mu orphan_mu; gpr_mu pollable_mu; - pollable *pollable_obj; + pollable* pollable_obj; gpr_atm read_closure; gpr_atm write_closure; - struct grpc_fd *freelist_next; - grpc_closure *on_done_closure; + struct grpc_fd* freelist_next; + grpc_closure* on_done_closure; /* The pollset that last noticed that the fd is readable. The actual type * stored in this is (grpc_pollset *) */ @@ -174,8 +174,8 @@ static void fd_global_shutdown(void); */ typedef struct { - grpc_pollset_worker *next; - grpc_pollset_worker *prev; + grpc_pollset_worker* next; + grpc_pollset_worker* prev; } pwlink; typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; @@ -188,18 +188,18 @@ struct grpc_pollset_worker { pid_t originator; #endif gpr_cv cv; - grpc_pollset *pollset; - pollable *pollable_obj; + grpc_pollset* pollset; + pollable* pollable_obj; pwlink links[PWLINK_COUNT]; }; struct grpc_pollset { gpr_mu mu; - pollable *active_pollable; + pollable* active_pollable; bool kicked_without_poller; - grpc_closure *shutdown_closure; - grpc_pollset_worker *root_worker; + grpc_closure* shutdown_closure; + grpc_pollset_worker* root_worker; int containing_pollset_set_count; }; @@ -210,23 +210,23 @@ struct grpc_pollset { struct grpc_pollset_set { gpr_refcount refs; gpr_mu mu; - grpc_pollset_set *parent; + grpc_pollset_set* parent; size_t pollset_count; size_t pollset_capacity; - grpc_pollset **pollsets; + grpc_pollset** pollsets; size_t fd_count; size_t fd_capacity; - grpc_fd **fds; + grpc_fd** fds; }; /******************************************************************************* * Common helpers */ -static bool append_error(grpc_error **composite, grpc_error *error, - const char *desc) { +static bool append_error(grpc_error** composite, grpc_error* error, + const char* desc) { if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc); @@ -252,14 +252,14 @@ static bool append_error(grpc_error **composite, grpc_error *error, * becomes a spurious read notification on a reused fd. */ -static grpc_fd *fd_freelist = NULL; +static grpc_fd* fd_freelist = NULL; static gpr_mu fd_freelist_mu; #ifndef NDEBUG #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__) #define UNREF_BY(ec, fd, n, reason) \ unref_by(ec, fd, n, reason, __FILE__, __LINE__) -static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, +static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file, int line) { if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { gpr_log(GPR_DEBUG, @@ -270,13 +270,13 @@ static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file, #else #define REF_BY(fd, n, reason) ref_by(fd, n) #define UNREF_BY(ec, fd, n, reason) unref_by(ec, fd, n) -static void ref_by(grpc_fd *fd, int n) { +static void ref_by(grpc_fd* fd, int n) { #endif GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); } -static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_fd *fd = (grpc_fd *)arg; +static void fd_destroy(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_fd* fd = (grpc_fd*)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); @@ -293,8 +293,8 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } #ifndef NDEBUG -static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, - const char *reason, const char *file, int line) { +static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n, + const char* reason, const char* file, int line) { if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) { gpr_log(GPR_DEBUG, "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]", @@ -302,13 +302,14 @@ static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n, gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line); } #else -static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) { +static void unref_by(grpc_exec_ctx* exec_ctx, grpc_fd* fd, int n) { #endif gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } else { GPR_ASSERT(old > n); } @@ -320,15 +321,15 @@ static void fd_global_shutdown(void) { gpr_mu_lock(&fd_freelist_mu); gpr_mu_unlock(&fd_freelist_mu); while (fd_freelist != NULL) { - grpc_fd *fd = fd_freelist; + grpc_fd* fd = fd_freelist; fd_freelist = fd_freelist->freelist_next; gpr_free(fd); } gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd *fd_create(int fd, const char *name) { - grpc_fd *new_fd = NULL; +static grpc_fd* fd_create(int fd, const char* name) { + grpc_fd* new_fd = NULL; gpr_mu_lock(&fd_freelist_mu); if (fd_freelist != NULL) { @@ -338,7 +339,7 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_mu_unlock(&fd_freelist_mu); if (new_fd == NULL) { - new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); + new_fd = (grpc_fd*)gpr_malloc(sizeof(grpc_fd)); } gpr_mu_init(&new_fd->pollable_mu); @@ -353,7 +354,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; - char *fd_name; + char* fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name); #ifndef NDEBUG @@ -365,14 +366,14 @@ static grpc_fd *fd_create(int fd, const char *name) { return new_fd; } -static int fd_wrapped_fd(grpc_fd *fd) { +static int fd_wrapped_fd(grpc_fd* fd) { int ret_fd = fd->fd; return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; } -static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *on_done, int *release_fd, - bool already_closed, const char *reason) { +static void fd_orphan(grpc_exec_ctx* exec_ctx, grpc_fd* fd, + grpc_closure* on_done, int* release_fd, + bool already_closed, const char* reason) { bool is_fd_closed = already_closed; gpr_mu_lock(&fd->orphan_mu); @@ -403,18 +404,18 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ } -static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, - grpc_fd *fd) { +static grpc_pollset* fd_get_read_notifier_pollset(grpc_exec_ctx* exec_ctx, + grpc_fd* fd) { gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); - return (grpc_pollset *)notifier; + return (grpc_pollset*)notifier; } -static bool fd_is_shutdown(grpc_fd *fd) { +static bool fd_is_shutdown(grpc_fd* fd) { return grpc_lfev_is_shutdown(&fd->read_closure); } /* Might be called multiple times */ -static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { +static void fd_shutdown(grpc_exec_ctx* exec_ctx, grpc_fd* fd, grpc_error* why) { if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure, GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); @@ -423,13 +424,13 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { GRPC_ERROR_UNREF(why); } -static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *closure) { +static void fd_notify_on_read(grpc_exec_ctx* exec_ctx, grpc_fd* fd, + grpc_closure* closure) { grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read"); } -static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_closure *closure) { +static void fd_notify_on_write(grpc_exec_ctx* exec_ctx, grpc_fd* fd, + grpc_closure* closure) { grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write"); } @@ -437,15 +438,15 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static grpc_error *pollable_create(pollable_type type, pollable **p) { +static grpc_error* pollable_create(pollable_type type, pollable** p) { *p = NULL; int epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd == -1) { return GRPC_OS_ERROR(errno, "epoll_create1"); } - *p = (pollable *)gpr_malloc(sizeof(**p)); - grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup); + *p = (pollable*)gpr_malloc(sizeof(**p)); + grpc_error* err = grpc_wakeup_fd_init(&(*p)->wakeup); if (err != GRPC_ERROR_NONE) { close(epfd); gpr_free(*p); @@ -454,7 +455,7 @@ static grpc_error *pollable_create(pollable_type type, pollable **p) { } struct epoll_event ev; ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup); + ev.data.ptr = (void*)(1 | (intptr_t) & (*p)->wakeup); if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { err = GRPC_OS_ERROR(errno, "epoll_ctl"); close(epfd); @@ -478,9 +479,9 @@ static grpc_error *pollable_create(pollable_type type, pollable **p) { } #ifdef NDEBUG -static pollable *pollable_ref(pollable *p) { +static pollable* pollable_ref(pollable* p) { #else -static pollable *pollable_ref(pollable *p, int line, const char *reason) { +static pollable* pollable_ref(pollable* p, int line, const char* reason) { if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { int r = (int)gpr_atm_no_barrier_load(&p->refs.count); gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, @@ -492,9 +493,9 @@ static pollable *pollable_ref(pollable *p, int line, const char *reason) { } #ifdef NDEBUG -static void pollable_unref(pollable *p) { +static void pollable_unref(pollable* p) { #else -static void pollable_unref(pollable *p, int line, const char *reason) { +static void pollable_unref(pollable* p, int line, const char* reason) { if (p == NULL) return; if (GRPC_TRACER_ON(grpc_trace_pollable_refcount)) { int r = (int)gpr_atm_no_barrier_load(&p->refs.count); @@ -509,9 +510,9 @@ static void pollable_unref(pollable *p, int line, const char *reason) { } } -static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "pollable_add_fd"; +static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { + grpc_error* error = GRPC_ERROR_NONE; + static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -541,7 +542,7 @@ GPR_TLS_DECL(g_current_thread_pollset); GPR_TLS_DECL(g_current_thread_worker); /* Global state management */ -static grpc_error *pollset_global_init(void) { +static grpc_error* pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); return pollable_create(PO_EMPTY, &g_empty_pollable); @@ -554,8 +555,8 @@ static void pollset_global_shutdown(void) { } /* pollset->mu must be held while calling this function */ -static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { +static void pollset_maybe_finish_shutdown(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " @@ -573,9 +574,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, /* pollset->mu must be held before calling this function, * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be * held */ -static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, - grpc_pollset_worker *specific_worker) { - pollable *p = specific_worker->pollable_obj; +static grpc_error* kick_one_worker(grpc_exec_ctx* exec_ctx, + grpc_pollset_worker* specific_worker) { + pollable* p = specific_worker->pollable_obj; grpc_core::mu_guard lock(&p->mu); GPR_ASSERT(specific_worker != NULL); if (specific_worker->kicked) { @@ -599,7 +600,7 @@ static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } specific_worker->kicked = true; - grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup); + grpc_error* error = grpc_wakeup_fd_wakeup(&p->wakeup); return error; } if (specific_worker->initialized_cv) { @@ -616,16 +617,15 @@ static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { +static grpc_error* pollset_kick(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker* specific_worker) { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", pollset, specific_worker, - (void *)gpr_tls_get(&g_current_thread_pollset), - (void *)gpr_tls_get(&g_current_thread_worker), - pollset->root_worker); + (void*)gpr_tls_get(&g_current_thread_pollset), + (void*)gpr_tls_get(&g_current_thread_worker), pollset->root_worker); } if (specific_worker == NULL) { if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { @@ -667,11 +667,11 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } -static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset) { - grpc_error *error = GRPC_ERROR_NONE; - const char *err_desc = "pollset_kick_all"; - grpc_pollset_worker *w = pollset->root_worker; +static grpc_error* pollset_kick_all(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset) { + grpc_error* error = GRPC_ERROR_NONE; + const char* err_desc = "pollset_kick_all"; + grpc_pollset_worker* w = pollset->root_worker; if (w != NULL) { do { GRPC_STATS_INC_POLLSET_KICK(exec_ctx); @@ -682,13 +682,13 @@ static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, return error; } -static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { +static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); *mu = &pollset->mu; } -static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, +static int poll_deadline_to_millis_timeout(grpc_exec_ctx* exec_ctx, grpc_millis millis) { if (millis == GRPC_MILLIS_INF_FUTURE) return -1; grpc_millis delta = millis - grpc_exec_ctx_now(exec_ctx); @@ -700,8 +700,8 @@ static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, return (int)delta; } -static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - grpc_pollset *notifier) { +static void fd_become_readable(grpc_exec_ctx* exec_ctx, grpc_fd* fd, + grpc_pollset* notifier) { grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read"); /* Note, it is possible that fd_become_readable might be called twice with @@ -713,14 +713,14 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier); } -static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { +static void fd_become_writable(grpc_exec_ctx* exec_ctx, grpc_fd* fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } -static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { +static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "fd_get_or_become_pollable"; + grpc_error* error = GRPC_ERROR_NONE; + static const char* err_desc = "fd_get_or_become_pollable"; if (fd->pollable_obj == NULL) { if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), err_desc)) { @@ -744,35 +744,35 @@ static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { } /* pollset->po.mu lock must be held by the caller before calling this */ -static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_closure *closure) { +static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_closure* closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - pollable *pollable_obj, bool drain) { - static const char *err_desc = "pollset_process_events"; - grpc_error *error = GRPC_ERROR_NONE; +static grpc_error* pollable_process_events(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset, + pollable* pollable_obj, bool drain) { + static const char* err_desc = "pollset_process_events"; + grpc_error* error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && pollable_obj->event_cursor != pollable_obj->event_count; i++) { int n = pollable_obj->event_cursor++; - struct epoll_event *ev = &pollable_obj->events[n]; - void *data_ptr = ev->data.ptr; + struct epoll_event* ev = &pollable_obj->events[n]; + void* data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p got pollset_wakeup %p", pollset, data_ptr); } append_error(&error, grpc_wakeup_fd_consume_wakeup( - (grpc_wakeup_fd *)((~(intptr_t)1) & (intptr_t)data_ptr)), + (grpc_wakeup_fd*)((~(intptr_t)1) & (intptr_t)data_ptr)), err_desc); } else { - grpc_fd *fd = (grpc_fd *)data_ptr; + grpc_fd* fd = (grpc_fd*)data_ptr; bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; @@ -795,17 +795,17 @@ static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, } /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ -static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { +static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset) { POLLABLE_UNREF(pollset->active_pollable, "pollset"); pollset->active_pollable = NULL; } -static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, +static grpc_error* pollable_epoll(grpc_exec_ctx* exec_ctx, pollable* p, grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { - char *desc = pollable_desc(p); + char* desc = pollable_desc(p); gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); } @@ -835,8 +835,8 @@ static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, } /* Return true if first in list */ -static bool worker_insert(grpc_pollset_worker **root_worker, - grpc_pollset_worker *worker, pwlinks link) { +static bool worker_insert(grpc_pollset_worker** root_worker, + grpc_pollset_worker* worker, pwlinks link) { if (*root_worker == NULL) { *root_worker = worker; worker->links[link].next = worker->links[link].prev = worker; @@ -853,8 +853,8 @@ static bool worker_insert(grpc_pollset_worker **root_worker, /* returns the new root IFF the root changed */ typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result; -static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, - grpc_pollset_worker *worker, +static worker_remove_result worker_remove(grpc_pollset_worker** root_worker, + grpc_pollset_worker* worker, pwlinks link) { if (worker == *root_worker) { if (worker == worker->links[link].next) { @@ -874,9 +874,9 @@ static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, } /* Return true if this thread should poll */ -static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl, +static bool begin_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker* worker, + grpc_pollset_worker** worker_hdl, grpc_millis deadline) { bool do_poll = (pollset->shutdown_closure == nullptr); if (worker_hdl != NULL) *worker_hdl = worker; @@ -927,16 +927,16 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return do_poll; } -static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - grpc_pollset_worker **worker_hdl) { +static void end_worker(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker* worker, + grpc_pollset_worker** worker_hdl) { gpr_mu_lock(&pollset->mu); gpr_mu_lock(&worker->pollable_obj->mu); switch (worker_remove(&worker->pollable_obj->root_worker, worker, PWLINK_POLLABLE)) { case WRR_NEW_ROOT: { // wakeup new poller - grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; + grpc_pollset_worker* new_root = worker->pollable_obj->root_worker; GPR_ASSERT(new_root->initialized_cv); gpr_cv_signal(&new_root->cv); break; @@ -969,12 +969,12 @@ static long gettid(void) { return syscall(__NR_gettid); } The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ -static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, +static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_pollset_worker** worker_hdl, grpc_millis deadline) { #ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP - grpc_pollset_worker *worker = - (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); + grpc_pollset_worker* worker = + (grpc_pollset_worker*)gpr_malloc(sizeof(*worker)); #define WORKER_PTR (worker) #else grpc_pollset_worker worker; @@ -984,13 +984,14 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, WORKER_PTR->originator = gettid(); #endif if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR - " deadline=%" PRIdPTR " kwp=%d pollable=%p", + gpr_log(GPR_DEBUG, + "PS:%p work hdl=%p worker=%p now=%" PRIdPTR " deadline=%" PRIdPTR + " kwp=%d pollable=%p", pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx), deadline, pollset->kicked_without_poller, pollset->active_pollable); } - static const char *err_desc = "pollset_work"; - grpc_error *error = GRPC_ERROR_NONE; + static const char* err_desc = "pollset_work"; + grpc_error* error = GRPC_ERROR_NONE; if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; } else { @@ -999,9 +1000,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); if (WORKER_PTR->pollable_obj->event_cursor == WORKER_PTR->pollable_obj->event_count) { - append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, - deadline), - err_desc); + append_error( + &error, + pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, deadline), + err_desc); } append_error(&error, pollable_process_events(exec_ctx, pollset, @@ -1020,10 +1022,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } -static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; - grpc_error *error = GRPC_ERROR_NONE; +static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked( + grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* fd) { + static const char* err_desc = "pollset_transition_pollable_from_empty_to_fd"; + grpc_error* error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p (%d); transition pollable from empty to fd", @@ -1036,10 +1038,10 @@ static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( return error; } -static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( - grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) { - static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; - grpc_error *error = GRPC_ERROR_NONE; +static grpc_error* pollset_transition_pollable_from_fd_to_multi_locked( + grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, grpc_fd* and_add_fd) { + static const char* err_desc = "pollset_transition_pollable_from_fd_to_multi"; + grpc_error* error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log( GPR_DEBUG, @@ -1048,7 +1050,7 @@ static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( pollset->active_pollable->owner_fd); } append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); - grpc_fd *initial_fd = pollset->active_pollable->owner_fd; + grpc_fd* initial_fd = pollset->active_pollable->owner_fd; POLLABLE_UNREF(pollset->active_pollable, "pollset"); pollset->active_pollable = NULL; if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable), @@ -1065,10 +1067,10 @@ static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( } /* expects pollsets locked, flag whether fd is locked or not */ -static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd *fd) { - grpc_error *error = GRPC_ERROR_NONE; - pollable *po_at_start = +static grpc_error* pollset_add_fd_locked(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset, grpc_fd* fd) { + grpc_error* error = GRPC_ERROR_NONE; + pollable* po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); switch (pollset->active_pollable->type) { case PO_EMPTY: @@ -1102,11 +1104,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, return error; } -static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - pollable **pollable_obj) { - grpc_error *error = GRPC_ERROR_NONE; - pollable *po_at_start = +static grpc_error* pollset_as_multipollable_locked(grpc_exec_ctx* exec_ctx, + grpc_pollset* pollset, + pollable** pollable_obj) { + grpc_error* error = GRPC_ERROR_NONE; + pollable* po_at_start = POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); switch (pollset->active_pollable->type) { case PO_EMPTY: @@ -1139,10 +1141,10 @@ static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, return error; } -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { +static void pollset_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset* pollset, + grpc_fd* fd) { gpr_mu_lock(&pollset->mu); - grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd); + grpc_error* error = pollset_add_fd_locked(exec_ctx, pollset, fd); gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1151,7 +1153,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * Pollset-set Definitions */ -static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { +static grpc_pollset_set* pss_lock_adam(grpc_pollset_set* pss) { gpr_mu_lock(&pss->mu); while (pss->parent != NULL) { gpr_mu_unlock(&pss->mu); @@ -1161,14 +1163,14 @@ static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { return pss; } -static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); +static grpc_pollset_set* pollset_set_create(void) { + grpc_pollset_set* pss = (grpc_pollset_set*)gpr_zalloc(sizeof(*pss)); gpr_mu_init(&pss->mu); gpr_ref_init(&pss->refs, 1); return pss; } -static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { +static void pollset_set_unref(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss) { if (pss == NULL) return; if (!gpr_unref(&pss->refs)) return; pollset_set_unref(exec_ctx, pss->parent); @@ -1188,13 +1190,13 @@ static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { gpr_free(pss); } -static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) { +static void pollset_set_add_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, + grpc_fd* fd) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); } - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "pollset_set_add_fd"; + grpc_error* error = GRPC_ERROR_NONE; + static const char* err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); for (size_t i = 0; i < pss->pollset_count; i++) { append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), @@ -1203,7 +1205,7 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, if (pss->fd_count == pss->fd_capacity) { pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); pss->fds = - (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); + (grpc_fd**)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); } REF_BY(fd, 2, "pollset_set"); pss->fds[pss->fd_count++] = fd; @@ -1212,8 +1214,8 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, GRPC_LOG_IF_ERROR(err_desc, error); } -static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) { +static void pollset_set_del_fd(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pss, + grpc_fd* fd) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); } @@ -1233,8 +1235,8 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, gpr_mu_unlock(&pss->mu); } -static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { +static void pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pss, grpc_pollset* ps) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } @@ -1260,12 +1262,12 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, // add all fds to pollables, and output a new array of unorphaned out_fds // assumes pollsets are multipollable -static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, - size_t fd_count, grpc_pollset **pollsets, +static grpc_error* add_fds_to_pollsets(grpc_exec_ctx* exec_ctx, grpc_fd** fds, + size_t fd_count, grpc_pollset** pollsets, size_t pollset_count, - const char *err_desc, grpc_fd **out_fds, - size_t *out_fd_count) { - grpc_error *error = GRPC_ERROR_NONE; + const char* err_desc, grpc_fd** out_fds, + size_t* out_fd_count) { + grpc_error* error = GRPC_ERROR_NONE; for (size_t i = 0; i < fd_count; i++) { gpr_mu_lock(&fds[i]->orphan_mu); if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { @@ -1284,14 +1286,14 @@ static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, return error; } -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { +static void pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pss, grpc_pollset* ps) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "pollset_set_add_pollset"; - pollable *pollable_obj = NULL; + grpc_error* error = GRPC_ERROR_NONE; + static const char* err_desc = "pollset_set_add_pollset"; + pollable* pollable_obj = NULL; gpr_mu_lock(&ps->mu); if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked( exec_ctx, ps, &pollable_obj))) { @@ -1310,7 +1312,7 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, err_desc); if (pss->pollset_count == pss->pollset_capacity) { pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); - pss->pollsets = (grpc_pollset **)gpr_realloc( + pss->pollsets = (grpc_pollset**)gpr_realloc( pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); } pss->pollsets[pss->pollset_count++] = ps; @@ -1320,24 +1322,24 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, GRPC_LOG_IF_ERROR(err_desc, error); } -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *a, - grpc_pollset_set *b) { +static void pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* a, + grpc_pollset_set* b) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); } - grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "pollset_set_add_fd"; + grpc_error* error = GRPC_ERROR_NONE; + static const char* err_desc = "pollset_set_add_fd"; for (;;) { if (a == b) { // pollset ancestors are the same: nothing to do return; } if (a > b) { - GPR_SWAP(grpc_pollset_set *, a, b); + GPR_SWAP(grpc_pollset_set*, a, b); } - gpr_mu *a_mu = &a->mu; - gpr_mu *b_mu = &b->mu; + gpr_mu* a_mu = &a->mu; + gpr_mu* b_mu = &b->mu; gpr_mu_lock(a_mu); gpr_mu_lock(b_mu); if (a->parent != NULL) { @@ -1355,7 +1357,7 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, const size_t a_size = a->fd_count + a->pollset_count; const size_t b_size = b->fd_count + b->pollset_count; if (b_size > a_size) { - GPR_SWAP(grpc_pollset_set *, a, b); + GPR_SWAP(grpc_pollset_set*, a, b); } if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a); @@ -1364,22 +1366,24 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, b->parent = a; if (a->fd_capacity < a->fd_count + b->fd_count) { a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); - a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + a->fds = (grpc_fd**)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); } size_t initial_a_fd_count = a->fd_count; a->fd_count = 0; - append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, - b->pollsets, b->pollset_count, - "merge_a2b", a->fds, &a->fd_count), - err_desc); - append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, - a->pollsets, a->pollset_count, - "merge_b2a", a->fds, &a->fd_count), - err_desc); + append_error( + &error, + add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, b->pollsets, + b->pollset_count, "merge_a2b", a->fds, &a->fd_count), + err_desc); + append_error( + &error, + add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, a->pollsets, + a->pollset_count, "merge_b2a", a->fds, &a->fd_count), + err_desc); if (a->pollset_capacity < a->pollset_count + b->pollset_count) { a->pollset_capacity = GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); - a->pollsets = (grpc_pollset **)gpr_realloc( + a->pollsets = (grpc_pollset**)gpr_realloc( a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); } if (b->pollset_count > 0) { @@ -1396,9 +1400,9 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&b->mu); } -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} +static void pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} /******************************************************************************* * Event engine binding @@ -1440,7 +1444,7 @@ static const grpc_event_engine_vtable vtable = { shutdown_engine, }; -const grpc_event_engine_vtable *grpc_init_epollex_linux( +const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { if (!explicitly_requested) { return NULL; @@ -1474,7 +1478,7 @@ const grpc_event_engine_vtable *grpc_init_epollex_linux( #include "src/core/lib/iomgr/ev_epollex_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ -const grpc_event_engine_vtable *grpc_init_epollex_linux( +const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { return NULL; } |