diff options
author | Craig Tiller <ctiller@google.com> | 2017-10-18 14:47:18 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-10-18 14:47:18 -0700 |
commit | d3d709a10fe2d3d9c2f2115bd4adad33e5a1ad0c (patch) | |
tree | 4a72ee6263f95fc39db944aab3699bc9f15922b8 /src/core/lib | |
parent | cdaf6d8e0660f83b708e4e73e90a4deaeab7ea2b (diff) | |
parent | dcd7e80fdaf29b3fd081e7ac9df708d4c562eb79 (diff) |
Merge github.com:grpc/grpc into tracer
Diffstat (limited to 'src/core/lib')
19 files changed, 1072 insertions, 826 deletions
diff --git a/src/core/lib/http/httpcli_security_connector.cc b/src/core/lib/http/httpcli_security_connector.cc index ef6c4a509b..d832dacb69 100644 --- a/src/core/lib/http/httpcli_security_connector.cc +++ b/src/core/lib/http/httpcli_security_connector.cc @@ -91,8 +91,17 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, tsi_peer_destruct(&peer); } +static int httpcli_ssl_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_httpcli_ssl_channel_security_connector *c1 = + (grpc_httpcli_ssl_channel_security_connector *)sc1; + grpc_httpcli_ssl_channel_security_connector *c2 = + (grpc_httpcli_ssl_channel_security_connector *)sc2; + return strcmp(c1->secure_peer_name, c2->secure_peer_name); +} + static grpc_security_connector_vtable httpcli_ssl_vtable = { - httpcli_ssl_destroy, httpcli_ssl_check_peer}; + httpcli_ssl_destroy, httpcli_ssl_check_peer, httpcli_ssl_cmp}; static grpc_security_status httpcli_ssl_channel_security_connector_create( grpc_exec_ctx *exec_ctx, const char *pem_root_certs, @@ -123,6 +132,10 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( *sc = NULL; return GRPC_SECURITY_ERROR; } + // We don't actually need a channel credentials object in this case, + // but we set it to a non-NULL address so that we don't trigger + // assertions in grpc_channel_security_connector_cmp(). + c->base.channel_creds = (grpc_channel_credentials *)1; c->base.add_handshakers = httpcli_ssl_add_handshakers; *sc = &c->base; return GRPC_SECURITY_OK; diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 58e5392c47..d30ca3fd5e 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -30,6 +30,7 @@ #include <pthread.h> #include <string.h> #include <sys/socket.h> +#include <sys/syscall.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -49,100 +50,96 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/spinlock.h" -/******************************************************************************* - * Polling object - */ -typedef enum { - PO_POLLING_GROUP, - PO_POLLSET_SET, - PO_POLLSET, - PO_FD, - /* ordering is important: we always want to lock pollsets before fds: - this guarantees that using an fd as a pollable is safe */ - PO_EMPTY_POLLABLE, - PO_COUNT -} polling_obj_type; - -typedef struct polling_obj polling_obj; -typedef struct polling_group polling_group; - -struct polling_obj { - gpr_mu mu; - polling_obj_type type; - polling_group *group; - struct polling_obj *next; - struct polling_obj *prev; -}; - -struct polling_group { - polling_obj po; - gpr_refcount refs; -}; +// debug aid: create workers on the heap (allows asan to spot +// use-after-destruction) +//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 -static void po_init(polling_obj *po, polling_obj_type type); -static void po_destroy(polling_obj *po); -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b); -static int po_cmp(polling_obj *a, polling_obj *b); +#define MAX_EPOLL_EVENTS 100 +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count); -static polling_group *pg_ref(polling_group *pg); -static void pg_unref(polling_group *pg); -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b); -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po); +#ifndef NDEBUG +grpc_core::Tracer grpc_trace_pollable_refcount(false, "pollable_refcount"); +#endif /******************************************************************************* * pollable Declarations */ -typedef struct pollable { - polling_obj po; +typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; + +typedef struct pollable pollable; + +/// A pollable is something that can be polled: it has an epoll set to poll on, +/// and a wakeup fd for kicks +/// There are three broad types: +/// - PO_EMPTY - the empty pollable, used before file descriptors are added to +/// a pollset +/// - PO_FD - a pollable containing only one FD - used to optimize single-fd +/// pollsets (which are common with synchronous api usage) +/// - PO_MULTI - a pollable containing many fds +struct pollable { + pollable_type type; // immutable + gpr_refcount refs; + int epfd; grpc_wakeup_fd wakeup; + + // only for type fd... one ref to the owner fd + grpc_fd *owner_fd; + + grpc_pollset_set *pollset_set; + pollable *next; + pollable *prev; + + gpr_mu mu; grpc_pollset_worker *root_worker; -} pollable; -static const char *polling_obj_type_string(polling_obj_type t) { + int event_cursor; + int event_count; + struct epoll_event events[MAX_EPOLL_EVENTS]; +}; + +static const char *pollable_type_string(pollable_type t) { switch (t) { - case PO_POLLING_GROUP: - return "polling_group"; - case PO_POLLSET_SET: - return "pollset_set"; - case PO_POLLSET: + case PO_MULTI: return "pollset"; case PO_FD: return "fd"; - case PO_EMPTY_POLLABLE: - return "empty_pollable"; - case PO_COUNT: - return "<invalid:count>"; + case PO_EMPTY: + return "empty"; } return "<invalid>"; } static char *pollable_desc(pollable *p) { char *out; - gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", - polling_obj_type_string(p->po.type), p->po.group, p->epfd, - p->wakeup.read_fd); + gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type), + p->epfd, p->wakeup.read_fd); return out; } -static pollable g_empty_pollable; +/// Shared empty pollable - used by pollset to poll on until the first fd is +/// added +static pollable *g_empty_pollable; -static void pollable_init(pollable *p, polling_obj_type type); -static void pollable_destroy(pollable *p); -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p); +static grpc_error *pollable_create(pollable_type type, pollable **p); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p); +static void pollable_unref(pollable *p); +#define POLLABLE_REF(p, r) pollable_ref(p) +#define POLLABLE_UNREF(p, r) pollable_unref(p) +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason); +static void pollable_unref(pollable *p, int line, const char *reason); +#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r)) +#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r)) +#endif /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -150,11 +147,10 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* The fd is either closed or we relinquished control of it. In either - cases, this indicates that the 'fd' on this structure is no longer - valid */ - gpr_mu orphaned_mu; - bool orphaned; + gpr_mu orphan_mu; + + gpr_mu pollable_mu; + pollable *pollable_obj; gpr_atm read_closure; gpr_atm write_closure; @@ -176,47 +172,52 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef struct pollset_worker_link { +typedef struct { grpc_pollset_worker *next; grpc_pollset_worker *prev; -} pollset_worker_link; +} pwlink; -typedef enum { - PWL_POLLSET, - PWL_POLLABLE, - POLLSET_WORKER_LINK_COUNT -} pollset_worker_links; +typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks; struct grpc_pollset_worker { bool kicked; bool initialized_cv; - pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; +#ifndef NDEBUG + // debug aid: which thread started this worker + pid_t originator; +#endif gpr_cv cv; grpc_pollset *pollset; pollable *pollable_obj; -}; -#define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 + pwlink links[PWLINK_COUNT]; +}; struct grpc_pollset { - pollable pollable_obj; - pollable *current_pollable_obj; - int kick_alls_pending; + gpr_mu mu; + pollable *active_pollable; bool kicked_without_poller; grpc_closure *shutdown_closure; grpc_pollset_worker *root_worker; - - int event_cursor; - int event_count; - struct epoll_event events[MAX_EPOLL_EVENTS]; + int containing_pollset_set_count; }; /******************************************************************************* * Pollset-set Declarations */ + struct grpc_pollset_set { - polling_obj po; + gpr_refcount refs; + gpr_mu mu; + grpc_pollset_set *parent; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; }; /******************************************************************************* @@ -250,11 +251,6 @@ static bool append_error(grpc_error **composite, grpc_error *error, * becomes a spurious read notification on a reused fd. */ -/* The alarm system needs to be able to wakeup 'some poller' sometimes - * (specifically when a new alarm needs to be triggered earlier than the next - * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a - * case occurs. */ - static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; @@ -282,8 +278,9 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable_obj); - gpr_mu_destroy(&fd->orphaned_mu); + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + gpr_mu_destroy(&fd->pollable_mu); + gpr_mu_destroy(&fd->orphan_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -343,12 +340,11 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable_obj, PO_FD); - + gpr_mu_init(&new_fd->pollable_mu); + gpr_mu_init(&new_fd->orphan_mu); + new_fd->pollable_obj = NULL; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - gpr_mu_init(&new_fd->orphaned_mu); - new_fd->orphaned = false; grpc_lfev_init(&new_fd->read_closure); grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -369,24 +365,17 @@ static grpc_fd *fd_create(int fd, const char *name) { } static int fd_wrapped_fd(grpc_fd *fd) { - int ret_fd = -1; - gpr_mu_lock(&fd->orphaned_mu); - if (!fd->orphaned) { - ret_fd = fd->fd; - } - gpr_mu_unlock(&fd->orphaned_mu); - - return ret_fd; + int ret_fd = fd->fd; + return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1; } static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, int *release_fd, bool already_closed, const char *reason) { bool is_fd_closed = already_closed; - grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable_obj.po.mu); - gpr_mu_lock(&fd->orphaned_mu); + gpr_mu_lock(&fd->orphan_mu); + fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -398,8 +387,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, is_fd_closed = true; } - fd->orphaned = true; - if (!is_fd_closed) { gpr_log(GPR_DEBUG, "TODO: handle fd removal?"); } @@ -408,13 +395,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + + gpr_mu_unlock(&fd->orphan_mu); - gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ - GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); - GRPC_ERROR_UNREF(error); } static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, @@ -451,63 +436,87 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable *p, polling_obj_type type) { - po_init(&p->po, type); - p->root_worker = NULL; - p->epfd = -1; +static grpc_error *pollable_create(pollable_type type, pollable **p) { + *p = NULL; + + int epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + return GRPC_OS_ERROR(errno, "epoll_create1"); + } + *p = (pollable *)gpr_malloc(sizeof(**p)); + grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup); + if (err != GRPC_ERROR_NONE) { + close(epfd); + gpr_free(*p); + *p = NULL; + return err; + } + struct epoll_event ev; + ev.events = (uint32_t)(EPOLLIN | EPOLLET); + ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup); + if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) { + err = GRPC_OS_ERROR(errno, "epoll_ctl"); + close(epfd); + grpc_wakeup_fd_destroy(&(*p)->wakeup); + gpr_free(*p); + *p = NULL; + return err; + } + + (*p)->type = type; + gpr_ref_init(&(*p)->refs, 1); + gpr_mu_init(&(*p)->mu); + (*p)->epfd = epfd; + (*p)->owner_fd = NULL; + (*p)->pollset_set = NULL; + (*p)->next = (*p)->prev = *p; + (*p)->root_worker = NULL; + (*p)->event_cursor = 0; + (*p)->event_count = 0; + return GRPC_ERROR_NONE; } -static void pollable_destroy(pollable *p) { - po_destroy(&p->po); - if (p->epfd != -1) { - close(p->epfd); - grpc_wakeup_fd_destroy(&p->wakeup); +#ifdef NDEBUG +static pollable *pollable_ref(pollable *p) { +#else +static pollable *pollable_ref(pollable *p, int line, const char *reason) { + if (grpc_trace_pollable_refcount.enabled()) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason); } +#endif + gpr_ref(&p->refs); + return p; } -/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable *p) { - if (p->epfd == -1) { - int new_epfd = epoll_create1(EPOLL_CLOEXEC); - if (new_epfd < 0) { - return GRPC_OS_ERROR(errno, "epoll_create1"); - } - grpc_error *err = grpc_wakeup_fd_init(&p->wakeup); - if (err != GRPC_ERROR_NONE) { - close(new_epfd); - return err; - } - struct epoll_event ev; - ev.events = (uint32_t)(EPOLLIN | EPOLLET); - ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup); - if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) { - err = GRPC_OS_ERROR(errno, "epoll_ctl"); - close(new_epfd); - grpc_wakeup_fd_destroy(&p->wakeup); - return err; - } - - p->epfd = new_epfd; +#ifdef NDEBUG +static void pollable_unref(pollable *p) { +#else +static void pollable_unref(pollable *p, int line, const char *reason) { + if (p == NULL) return; + if (grpc_trace_pollable_refcount.enabled()) { + int r = (int)gpr_atm_no_barrier_load(&p->refs.count); + gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG, + "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason); + } +#endif + if (p != NULL && gpr_unref(&p->refs)) { + close(p->epfd); + grpc_wakeup_fd_destroy(&p->wakeup); + gpr_free(p); } - return GRPC_ERROR_NONE; } -/* pollable must be materialized */ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; - GPR_ASSERT(epfd != -1); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } - gpr_mu_lock(&fd->orphaned_mu); - if (fd->orphaned) { - gpr_mu_unlock(&fd->orphaned_mu); - return GRPC_ERROR_NONE; - } struct epoll_event ev_fd; ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; @@ -519,7 +528,6 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc); } } - gpr_mu_unlock(&fd->orphaned_mu); return error; } @@ -535,128 +543,67 @@ GPR_TLS_DECL(g_current_thread_worker); static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_pollset); gpr_tls_init(&g_current_thread_worker); - pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE); - return GRPC_ERROR_NONE; + return pollable_create(PO_EMPTY, &g_empty_pollable); } static void pollset_global_shutdown(void) { - pollable_destroy(&g_empty_pollable); + POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable"); gpr_tls_destroy(&g_current_thread_pollset); gpr_tls_destroy(&g_current_thread_worker); } +/* pollset->mu must be held while calling this function */ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, + "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) " + "rw=%p (target:NULL) cpsc=%d (target:0)", + pollset, pollset->active_pollable, pollset->shutdown_closure, + pollset->root_worker, pollset->containing_pollset_set_count); + } if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && - pollset->kick_alls_pending == 0) { + pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } -static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_unused) { - grpc_error *error = GRPC_ERROR_NONE; - grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (pollset->root_worker != NULL) { - grpc_pollset_worker *worker = pollset->root_worker; - do { - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker->pollable_obj->po.mu); - } - if (worker->initialized_cv && worker != pollset->root_worker) { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - worker->kicked = true; - gpr_cv_signal(&worker->cv); - } else { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable_obj, - worker->pollable_obj); - } - append_error(&error, - grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), - "pollset_shutdown"); - } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - } - - worker = worker->links[PWL_POLLSET].next; - } while (worker != pollset->root_worker); - } - pollset->kick_alls_pending--; - pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - GRPC_LOG_IF_ERROR("kick_all", error); -} - -static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollset->kick_alls_pending++; - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); -} - -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, - grpc_pollset_worker *specific_worker) { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, - "PS:%p kick %p tls_pollset=%p tls_worker=%p " - "root_worker=(pollset:%p pollable:%p)", - p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset), - (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker, - p->root_worker); - } - if (specific_worker == NULL) { - if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { - if (pollset->root_worker == NULL) { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p); - } - pollset->kicked_without_poller = true; - return GRPC_ERROR_NONE; - } else { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p); - } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } - } else { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p); - } - return GRPC_ERROR_NONE; - } - } else if (specific_worker->kicked) { +/* pollset->mu must be held before calling this function, + * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be + * held */ +static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx, + grpc_pollset_worker *specific_worker) { + pollable *p = specific_worker->pollable_obj; + grpc_core::mu_guard lock(&p->mu); + GRPC_STATS_INC_POLLSET_KICK(exec_ctx); + GPR_ASSERT(specific_worker != NULL); + if (specific_worker->kicked) { if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p); } + GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx); return GRPC_ERROR_NONE; - } else if (gpr_tls_get(&g_current_thread_worker) == - (intptr_t)specific_worker) { + } + if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) { if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p); } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); specific_worker->kicked = true; return GRPC_ERROR_NONE; - } else if (specific_worker == p->root_worker) { + } + if (specific_worker == p->root_worker) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p); } - grpc_error *err = pollable_materialize(p); - if (err != GRPC_ERROR_NONE) return err; specific_worker->kicked = true; - return grpc_wakeup_fd_wakeup(&p->wakeup); - } else { + grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup); + return error; + } + if (specific_worker->initialized_cv) { + GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx); if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p); } @@ -664,30 +611,79 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, gpr_cv_signal(&specific_worker->cv); return GRPC_ERROR_NONE; } + // we can get here during end_worker after removing specific_worker from the + // pollable list but before removing it from the pollset list + return GRPC_ERROR_NONE; } -/* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable *p = pollset->current_pollable_obj; - GRPC_STATS_INC_POLLSET_KICK(exec_ctx); - if (p != &pollset->pollable_obj) { - gpr_mu_lock(&p->po.mu); + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, + "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p", + pollset, specific_worker, + (void *)gpr_tls_get(&g_current_thread_pollset), + (void *)gpr_tls_get(&g_current_thread_worker), + pollset->root_worker); + } + if (specific_worker == NULL) { + if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) { + if (pollset->root_worker == NULL) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset); + } + GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx); + pollset->kicked_without_poller = true; + return GRPC_ERROR_NONE; + } else { + // We've been asked to kick a poller, but we haven't been told which one + // ... any will do + // We look at the pollset worker list because: + // 1. the pollable list may include workers from other pollers, so we'd + // need to do an O(N) search + // 2. we'd additionally need to take the pollable lock, which we've so + // far avoided + // Now, we would prefer to wake a poller in cv_wait, and not in + // epoll_wait (since the latter would imply the need to do an additional + // wakeup) + // We know that if a worker is at the root of a pollable, it's (likely) + // also the root of a pollset, and we know that if a worker is NOT at + // the root of a pollset, it's (likely) not at the root of a pollable, + // so we take our chances and choose the SECOND worker enqueued against + // the pollset as a worker that's likely to be in cv_wait + return kick_one_worker( + exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next); + } + } else { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset); + } + GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx); + return GRPC_ERROR_NONE; + } + } else { + return kick_one_worker(exec_ctx, specific_worker); } - grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable_obj) { - gpr_mu_unlock(&p->po.mu); +} + +static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset) { + grpc_error *error = GRPC_ERROR_NONE; + const char *err_desc = "pollset_kick_all"; + grpc_pollset_worker *w = pollset->root_worker; + if (w != NULL) { + do { + append_error(&error, kick_one_worker(exec_ctx, w), err_desc); + w = w->links[PWLINK_POLLSET].next; + } while (w != pollset->root_worker); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable_obj, PO_POLLSET); - pollset->current_pollable_obj = &g_empty_pollable; - pollset->kicked_without_poller = false; - pollset->shutdown_closure = NULL; - pollset->root_worker = NULL; - *mu = &pollset->pollable_obj.po.mu; + gpr_mu_init(&pollset->mu); + pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); + *mu = &pollset->mu; } static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, @@ -719,12 +715,29 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write"); } -static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { +static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) { + gpr_mu_lock(&fd->pollable_mu); grpc_error *error = GRPC_ERROR_NONE; - static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); + static const char *err_desc = "fd_get_or_become_pollable"; + if (fd->pollable_obj == NULL) { + if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), + err_desc)) { + fd->pollable_obj->owner_fd = fd; + if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd), + err_desc)) { + POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); + fd->pollable_obj = NULL; + } + } } + if (error == GRPC_ERROR_NONE) { + GPR_ASSERT(fd->pollable_obj != NULL); + *p = POLLABLE_REF(fd->pollable_obj, "pollset"); + } else { + GPR_ASSERT(fd->pollable_obj == NULL); + *p = NULL; + } + gpr_mu_unlock(&fd->pollable_mu); return error; } @@ -733,23 +746,20 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(pollset->shutdown_closure == NULL); pollset->shutdown_closure = closure; - pollset_kick_all(exec_ctx, pollset); + GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset)); pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { - return p != &g_empty_pollable && p != &pollset->pollable_obj; -} - -static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, bool drain) { +static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable *pollable_obj, bool drain) { static const char *err_desc = "pollset_process_events"; grpc_error *error = GRPC_ERROR_NONE; for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && - pollset->event_cursor != pollset->event_count; + pollable_obj->event_cursor != pollable_obj->event_count; i++) { - int n = pollset->event_cursor++; - struct epoll_event *ev = &pollset->events[n]; + int n = pollable_obj->event_cursor++; + struct epoll_event *ev = &pollable_obj->events[n]; void *data_ptr = ev->data.ptr; if (1 & (intptr_t)data_ptr) { if (grpc_polling_trace.enabled()) { @@ -784,22 +794,17 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable_obj); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, - "pollset_pollable"); - } - GRPC_LOG_IF_ERROR("pollset_process_events", - pollset_process_events(exec_ctx, pollset, true)); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; } -static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable *p, grpc_millis deadline) { +static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p, + grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (grpc_polling_trace.enabled()) { char *desc = pollable_desc(p); - gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout); + gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout); gpr_free(desc); } @@ -809,7 +814,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int r; do { GRPC_STATS_INC_SYSCALL_POLL(exec_ctx); - r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout); + r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout); } while (r < 0 && errno == EINTR); if (timeout != 0) { GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx); @@ -818,24 +823,24 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait"); if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r); + gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r); } - pollset->event_cursor = 0; - pollset->event_count = r; + p->event_cursor = 0; + p->event_count = r; return GRPC_ERROR_NONE; } /* Return true if first in list */ -static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, - grpc_pollset_worker *worker) { - if (*root == NULL) { - *root = worker; +static bool worker_insert(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, pwlinks link) { + if (*root_worker == NULL) { + *root_worker = worker; worker->links[link].next = worker->links[link].prev = worker; return true; } else { - worker->links[link].next = *root; + worker->links[link].next = *root_worker; worker->links[link].prev = worker->links[link].next->links[link].prev; worker->links[link].next->links[link].prev = worker; worker->links[link].prev->links[link].next = worker; @@ -843,26 +848,26 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link, } } -/* Return true if last in list */ -typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result; +/* returns the new root IFF the root changed */ +typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result; -static worker_remove_result worker_remove(grpc_pollset_worker **root, - pollset_worker_links link, - grpc_pollset_worker *worker) { - if (worker == *root) { +static worker_remove_result worker_remove(grpc_pollset_worker **root_worker, + grpc_pollset_worker *worker, + pwlinks link) { + if (worker == *root_worker) { if (worker == worker->links[link].next) { - *root = NULL; - return EMPTIED; + *root_worker = NULL; + return WRR_EMPTIED; } else { - *root = worker->links[link].next; + *root_worker = worker->links[link].next; worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return NEW_ROOT; + return WRR_NEW_ROOT; } } else { worker->links[link].prev->links[link].next = worker->links[link].next; worker->links[link].next->links[link].prev = worker->links[link].prev; - return REMOVED; + return WRR_REMOVED; } } @@ -871,25 +876,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { - bool do_poll = true; + bool do_poll = (pollset->shutdown_closure == nullptr); if (worker_hdl != NULL) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable_obj = pollset->current_pollable_obj; - - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - - worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, - worker)) { + worker->pollable_obj = + POLLABLE_REF(pollset->active_pollable, "pollset_worker"); + worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET); + gpr_mu_lock(&worker->pollable_obj->mu); + if (!worker_insert(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - } + gpr_mu_unlock(&pollset->mu); if (grpc_polling_trace.enabled() && worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, @@ -897,7 +897,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, poll_deadline_to_millis_timeout(exec_ctx, deadline)); } while (do_poll && worker->pollable_obj->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, @@ -916,158 +916,232 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->pollable_obj, worker); } } - if (worker->pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker->pollable_obj->po.mu); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - gpr_mu_lock(&worker->pollable_obj->po.mu); - } grpc_exec_ctx_invalidate_now(exec_ctx); + } else { + gpr_mu_unlock(&pollset->mu); } + gpr_mu_unlock(&worker->pollable_obj->mu); - return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable_obj == worker->pollable_obj; + return do_poll; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { - if (NEW_ROOT == - worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable_obj->root_worker->cv); + gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&worker->pollable_obj->mu); + switch (worker_remove(&worker->pollable_obj->root_worker, worker, + PWLINK_POLLABLE)) { + case WRR_NEW_ROOT: { + // wakeup new poller + grpc_pollset_worker *new_root = worker->pollable_obj->root_worker; + GPR_ASSERT(new_root->initialized_cv); + gpr_cv_signal(&new_root->cv); + break; + } + case WRR_EMPTIED: + if (pollset->active_pollable != worker->pollable_obj) { + // pollable no longer being polled: flush events + pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true); + } + break; + case WRR_REMOVED: + break; + } + gpr_mu_unlock(&worker->pollable_obj->mu); + POLLABLE_UNREF(worker->pollable_obj, "pollset_worker"); + if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) == + WRR_EMPTIED) { + pollset_maybe_finish_shutdown(exec_ctx, pollset); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); - } - if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { - pollset_maybe_finish_shutdown(exec_ctx, pollset); - } } -/* pollset->po.mu lock must be held by the caller before calling this. +#ifndef NDEBUG +static long gettid(void) { return syscall(__NR_gettid); } +#endif + +/* pollset->mu lock must be held by the caller before calling this. The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, grpc_millis deadline) { +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + grpc_pollset_worker *worker = + (grpc_pollset_worker *)gpr_malloc(sizeof(*worker)); +#define WORKER_PTR (worker) +#else grpc_pollset_worker worker; - if (0 && grpc_polling_trace.enabled()) { +#define WORKER_PTR (&worker) +#endif +#ifndef NDEBUG + WORKER_PTR->originator = gettid(); +#endif + if (grpc_polling_trace.enabled()) { gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR - " deadline=%" PRIdPTR " kwp=%d root_worker=%p", - pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline, - pollset->kicked_without_poller, pollset->root_worker); + " deadline=%" PRIdPTR " kwp=%d pollable=%p", + pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx), + deadline, pollset->kicked_without_poller, pollset->active_pollable); } - grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollset_work"; + grpc_error *error = GRPC_ERROR_NONE; if (pollset->kicked_without_poller) { pollset->kicked_without_poller = false; - return GRPC_ERROR_NONE; - } - if (pollset->current_pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&pollset->current_pollable_obj->po.mu); - } - if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { - gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); - gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); - GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - if (pollset->event_cursor == pollset->event_count) { - append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, - deadline), + } else { + if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) { + gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); + gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR); + if (WORKER_PTR->pollable_obj->event_cursor == + WORKER_PTR->pollable_obj->event_count) { + append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj, + deadline), + err_desc); + } + append_error(&error, + pollable_process_events(exec_ctx, pollset, + WORKER_PTR->pollable_obj, false), err_desc); + grpc_exec_ctx_flush(exec_ctx); + gpr_tls_set(&g_current_thread_pollset, 0); + gpr_tls_set(&g_current_thread_worker, 0); } - append_error(&error, pollset_process_events(exec_ctx, pollset, false), - err_desc); - gpr_mu_lock(&pollset->pollable_obj.po.mu); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_lock(&worker.pollable_obj->po.mu); - } - gpr_tls_set(&g_current_thread_pollset, 0); - gpr_tls_set(&g_current_thread_worker, 0); - pollset_maybe_finish_shutdown(exec_ctx, pollset); + end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl); } - end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable_obj != &pollset->pollable_obj) { - gpr_mu_unlock(&worker.pollable_obj->po.mu); - } - if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable_obj.po.mu); - grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable_obj.po.mu); +#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP + gpr_free(worker); +#endif +#undef WORKER_PTR + return error; +} + +static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd"; + grpc_error *error = GRPC_ERROR_NONE; + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from empty to fd", + pollset, fd, fd->fd); } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable), + err_desc); return error; } -static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_fd *fd = (grpc_fd *)arg; - UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); +static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) { + static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi"; + grpc_error *error = GRPC_ERROR_NONE; + if (grpc_polling_trace.enabled()) { + gpr_log( + GPR_DEBUG, + "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller", + pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1, + pollset->active_pollable->owner_fd); + } + append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc); + grpc_fd *initial_fd = pollset->active_pollable->owner_fd; + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = NULL; + if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable), + err_desc)) { + append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd), + err_desc); + if (and_add_fd != NULL) { + append_error(&error, + pollable_add_fd(pollset->active_pollable, and_add_fd), + err_desc); + } + } + return error; } /* expects pollsets locked, flag whether fd is locked or not */ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, grpc_fd *fd, - bool fd_locked) { - static const char *err_desc = "pollset_add_fd"; + grpc_pollset *pollset, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable_obj == &g_empty_pollable) { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from empty to fd", pollset, - fd); - } - /* empty pollable --> single fd pollable */ - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &fd->pollable_obj; - if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); - append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); - REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); - } - append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), - err_desc); - } else if (pollset->current_pollable_obj != &fd->pollable_obj) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; - if (grpc_polling_trace.enabled()) { - gpr_log(GPR_DEBUG, - "PS:%p add fd %p; transition pollable from fd %p to multipoller", - pollset, fd, had_fd); - } - /* Introduce a spurious completion. - If we do not, then it may be that the fd-specific epoll set consumed - a completion without being polled, leading to a missed edge going up. */ - grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); - grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); - pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable_obj = &pollset->pollable_obj; - if (append_error(&error, pollable_materialize(&pollset->pollable_obj), - err_desc)) { - pollable_add_fd(&pollset->pollable_obj, had_fd); - pollable_add_fd(&pollset->pollable_obj, fd); - } - GRPC_CLOSURE_SCHED(exec_ctx, - GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_add_fd"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + /* empty pollable --> single fd pollable */ + error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, + pollset, fd); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + error = pollset_transition_pollable_from_empty_to_fd_locked( + exec_ctx, pollset, fd); + } else { + /* fd --> multipoller */ + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, fd); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + error = pollable_add_fd(pollset->active_pollable, fd); + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + } else { + POLLABLE_UNREF(po_at_start, "pollset_add_fd"); + } + return error; +} + +static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + pollable **pollable_obj) { + grpc_error *error = GRPC_ERROR_NONE; + pollable *po_at_start = + POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable"); + switch (pollset->active_pollable->type) { + case PO_EMPTY: + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + break; + case PO_FD: + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & + 1) == 0) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + } else { + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, NULL); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + break; + case PO_MULTI: + break; + } + if (error != GRPC_ERROR_NONE) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + pollset->active_pollable = po_at_start; + *pollable_obj = NULL; + } else { + *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set"); + POLLABLE_UNREF(po_at_start, "pollset_as_multipollable"); } return error; } static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable_obj.po.mu); - grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&pollset->mu); + grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd); + gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1075,301 +1149,255 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * Pollset-set Definitions */ +static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) { + gpr_mu_lock(&pss->mu); + while (pss->parent != NULL) { + gpr_mu_unlock(&pss->mu); + pss = pss->parent; + gpr_mu_lock(&pss->mu); + } + return pss; +} + static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); - po_init(&pss->po, PO_POLLSET_SET); + gpr_mu_init(&pss->mu); + gpr_ref_init(&pss->refs, 1); return pss; } -static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss) { - po_destroy(&pss->po); +static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { + if (pss == NULL) return; + if (!gpr_unref(&pss->refs)) return; + pollset_set_unref(exec_ctx, pss->parent); + gpr_mu_destroy(&pss->mu); + for (size_t i = 0; i < pss->pollset_count; i++) { + gpr_mu_lock(&pss->pollsets[i]->mu); + if (0 == --pss->pollsets[i]->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); + } + gpr_mu_unlock(&pss->pollsets[i]->mu); + } + for (size_t i = 0; i < pss->fd_count; i++) { + UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); + } + gpr_free(pss->pollsets); + gpr_free(pss->fds); gpr_free(pss); } static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; + pss = pss_lock_adam(pss); + for (size_t i = 0; i < pss->pollset_count; i++) { + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), + err_desc); + } + if (pss->fd_count == pss->fd_capacity) { + pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); + pss->fds = + (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds)); + } + REF_BY(fd, 2, "pollset_set"); + pss->fds[pss->fd_count++] = fd; + gpr_mu_unlock(&pss->mu); + + GRPC_LOG_IF_ERROR(err_desc, error); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, - grpc_fd *fd) {} - -static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); + grpc_fd *fd) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd); + } + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->fd_count; i++) { + if (pss->fds[i] == fd) { + UNREF_BY(exec_ctx, fd, 2, "pollset_set"); + break; + } + } + GPR_ASSERT(i != pss->fd_count); + for (; i < pss->fd_count - 1; i++) { + pss->fds[i] = pss->fds[i + 1]; + } + pss->fd_count--; + gpr_mu_unlock(&pss->mu); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pss, grpc_pollset *ps) {} - -static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) { - po_join(exec_ctx, &bag->po, &item->po); -} - -static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *bag, - grpc_pollset_set *item) {} - -static void po_init(polling_obj *po, polling_obj_type type) { - gpr_mu_init(&po->mu); - po->type = type; - po->group = NULL; - po->next = po; - po->prev = po; -} - -static polling_group *pg_lock_latest(polling_group *pg) { - /* assumes pg unlocked; consumes ref, returns ref */ - gpr_mu_lock(&pg->po.mu); - while (pg->po.group != NULL) { - polling_group *new_pg = pg_ref(pg->po.group); - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); - pg = new_pg; - gpr_mu_lock(&pg->po.mu); - } - return pg; -} - -static void po_destroy(polling_obj *po) { - if (po->group != NULL) { - polling_group *pg = pg_lock_latest(po->group); - po->prev->next = po->next; - po->next->prev = po->prev; - gpr_mu_unlock(&pg->po.mu); - pg_unref(pg); + grpc_pollset_set *pss, grpc_pollset *ps) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps); } - gpr_mu_destroy(&po->mu); -} - -static polling_group *pg_ref(polling_group *pg) { - gpr_ref(&pg->refs); - return pg; -} - -static void pg_unref(polling_group *pg) { - if (gpr_unref(&pg->refs)) { - po_destroy(&pg->po); - gpr_free(pg); + pss = pss_lock_adam(pss); + size_t i; + for (i = 0; i < pss->pollset_count; i++) { + if (pss->pollsets[i] == ps) { + break; + } } -} - -static int po_cmp(polling_obj *a, polling_obj *b) { - if (a == b) return 0; - if (a->type < b->type) return -1; - if (a->type > b->type) return 1; - if (a < b) return -1; - assert(a > b); - return 1; -} - -static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - switch (po_cmp(a, b)) { - case 0: - return; - case 1: - GPR_SWAP(polling_obj *, a, b); - /* fall through */ - case -1: - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - - if (a->group == NULL) { - if (b->group == NULL) { - polling_obj *initial_po[] = {a, b}; - pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po)); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&b->mu); - gpr_mu_unlock(&a->mu); - pg_join(exec_ctx, b_group, a); - } - } else if (b->group == NULL) { - polling_group *a_group = pg_ref(a->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_join(exec_ctx, a_group, b); - } else if (a->group == b->group) { - /* nothing to do */ - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - } else { - polling_group *a_group = pg_ref(a->group); - polling_group *b_group = pg_ref(b->group); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); - pg_merge(exec_ctx, a_group, b_group); - } + GPR_ASSERT(i != pss->pollset_count); + for (; i < pss->pollset_count - 1; i++) { + pss->pollsets[i] = pss->pollsets[i + 1]; } -} - -static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) { - if (a->type == PO_FD && b->type == PO_POLLSET) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true); - } else if (a->type == PO_POLLSET && b->type == PO_FD) { - pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true); + pss->pollset_count--; + gpr_mu_unlock(&pss->mu); + gpr_mu_lock(&ps->mu); + if (0 == --ps->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, ps); } + gpr_mu_unlock(&ps->mu); } -static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, - polling_group *to) { - for (polling_obj *a = from->po.next; a != &from->po; a = a->next) { - for (polling_obj *b = to->po.next; b != &to->po; b = b->next) { - if (po_cmp(a, b) < 0) { - gpr_mu_lock(&a->mu); - gpr_mu_lock(&b->mu); - } else { - GPR_ASSERT(po_cmp(a, b) != 0); - gpr_mu_lock(&b->mu); - gpr_mu_lock(&a->mu); +// add all fds to pollables, and output a new array of unorphaned out_fds +// assumes pollsets are multipollable +static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, + const char *err_desc, grpc_fd **out_fds, + size_t *out_fd_count) { + grpc_error *error = GRPC_ERROR_NONE; + for (size_t i = 0; i < fd_count; i++) { + gpr_mu_lock(&fds[i]->orphan_mu); + if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { + gpr_mu_unlock(&fds[i]->orphan_mu); + UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); + } else { + for (size_t j = 0; j < pollset_count; j++) { + append_error(&error, + pollable_add_fd(pollsets[j]->active_pollable, fds[i]), + err_desc); } - pg_notify(exec_ctx, a, b); - gpr_mu_unlock(&a->mu); - gpr_mu_unlock(&b->mu); + gpr_mu_unlock(&fds[i]->orphan_mu); + out_fds[(*out_fd_count)++] = fds[i]; } } + return error; } -static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, - size_t initial_po_count) { - /* assumes all polling objects in initial_po are locked */ - polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg)); - po_init(&pg->po, PO_POLLING_GROUP); - gpr_ref_init(&pg->refs, (int)initial_po_count); - for (size_t i = 0; i < initial_po_count; i++) { - GPR_ASSERT(initial_po[i]->group == NULL); - initial_po[i]->group = pg; - } - for (size_t i = 1; i < initial_po_count; i++) { - initial_po[i]->prev = initial_po[i - 1]; - } - for (size_t i = 0; i < initial_po_count - 1; i++) { - initial_po[i]->next = initial_po[i + 1]; - } - initial_po[0]->prev = &pg->po; - initial_po[initial_po_count - 1]->next = &pg->po; - pg->po.next = initial_po[0]; - pg->po.prev = initial_po[initial_po_count - 1]; - for (size_t i = 1; i < initial_po_count; i++) { - for (size_t j = 0; j < i; j++) { - pg_notify(exec_ctx, initial_po[i], initial_po[j]); - } +static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pss, grpc_pollset *ps) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps); } -} - -static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, - polling_obj *po) { - /* assumes neither pg nor po are locked; consumes one ref to pg */ - pg = pg_lock_latest(pg); - /* pg locked */ - for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */; - existing != &pg->po; existing = existing->next) { - if (po_cmp(po, existing) < 0) { - gpr_mu_lock(&po->mu); - gpr_mu_lock(&existing->mu); - } else { - GPR_ASSERT(po_cmp(po, existing) != 0); - gpr_mu_lock(&existing->mu); - gpr_mu_lock(&po->mu); - } - /* pg, po, existing locked */ - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ - return; - } - pg_notify(exec_ctx, po, existing); - gpr_mu_unlock(&po->mu); - gpr_mu_unlock(&existing->mu); - } - gpr_mu_lock(&po->mu); - if (po->group != NULL) { - gpr_mu_unlock(&pg->po.mu); - polling_group *po_group = pg_ref(po->group); - gpr_mu_unlock(&po->mu); - pg_merge(exec_ctx, pg, po_group); - /* early exit: polling obj picked up a group during joining: we needed - to do a full merge */ + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_pollset"; + pollable *pollable_obj = NULL; + gpr_mu_lock(&ps->mu); + if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked( + exec_ctx, ps, &pollable_obj))) { + GPR_ASSERT(pollable_obj == NULL); + gpr_mu_unlock(&ps->mu); return; } - po->group = pg; - po->next = &pg->po; - po->prev = pg->po.prev; - po->prev->next = po->next->prev = po; - gpr_mu_unlock(&pg->po.mu); - gpr_mu_unlock(&po->mu); + ps->containing_pollset_set_count++; + gpr_mu_unlock(&ps->mu); + pss = pss_lock_adam(pss); + size_t initial_fd_count = pss->fd_count; + pss->fd_count = 0; + append_error(&error, + add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1, + err_desc, pss->fds, &pss->fd_count), + err_desc); + if (pss->pollset_count == pss->pollset_capacity) { + pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); + pss->pollsets = (grpc_pollset **)gpr_realloc( + pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); + } + pss->pollsets[pss->pollset_count++] = ps; + gpr_mu_unlock(&pss->mu); + POLLABLE_UNREF(pollable_obj, "pollset_set"); + + GRPC_LOG_IF_ERROR(err_desc, error); } -static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, - polling_group *b) { +static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *a, + grpc_pollset_set *b) { + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b); + } + grpc_error *error = GRPC_ERROR_NONE; + static const char *err_desc = "pollset_set_add_fd"; for (;;) { if (a == b) { - pg_unref(a); - pg_unref(b); + // pollset ancestors are the same: nothing to do return; } - if (a > b) GPR_SWAP(polling_group *, a, b); - gpr_mu_lock(&a->po.mu); - gpr_mu_lock(&b->po.mu); - if (a->po.group != NULL) { - polling_group *m2 = pg_ref(a->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(a); - a = m2; - } else if (b->po.group != NULL) { - polling_group *m2 = pg_ref(b->po.group); - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - pg_unref(b); - b = m2; + if (a > b) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + gpr_mu *a_mu = &a->mu; + gpr_mu *b_mu = &b->mu; + gpr_mu_lock(a_mu); + gpr_mu_lock(b_mu); + if (a->parent != NULL) { + a = a->parent; + } else if (b->parent != NULL) { + b = b->parent; } else { - break; + break; // exit loop, both pollsets locked } + gpr_mu_unlock(a_mu); + gpr_mu_unlock(b_mu); } - polling_group **unref = NULL; - size_t unref_count = 0; - size_t unref_cap = 0; - b->po.group = a; - pg_broadcast(exec_ctx, a, b); - pg_broadcast(exec_ctx, b, a); - while (b->po.next != &b->po) { - polling_obj *po = b->po.next; - gpr_mu_lock(&po->mu); - if (unref_count == unref_cap) { - unref_cap = GPR_MAX(8, 3 * unref_cap / 2); - unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref)); - } - unref[unref_count++] = po->group; - po->group = pg_ref(a); - // unlink from b - po->prev->next = po->next; - po->next->prev = po->prev; - // link to a - po->next = &a->po; - po->prev = a->po.prev; - po->next->prev = po->prev->next = po; - gpr_mu_unlock(&po->mu); - } - gpr_mu_unlock(&a->po.mu); - gpr_mu_unlock(&b->po.mu); - for (size_t i = 0; i < unref_count; i++) { - pg_unref(unref[i]); - } - gpr_free(unref); - pg_unref(b); + // try to do the least copying possible + // TODO(ctiller): there's probably a better heuristic here + const size_t a_size = a->fd_count + a->pollset_count; + const size_t b_size = b->fd_count + b->pollset_count; + if (b_size > a_size) { + GPR_SWAP(grpc_pollset_set *, a, b); + } + if (grpc_polling_trace.enabled()) { + gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a); + } + gpr_ref(&a->refs); + b->parent = a; + if (a->fd_capacity < a->fd_count + b->fd_count) { + a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); + a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + } + size_t initial_a_fd_count = a->fd_count; + a->fd_count = 0; + append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count, + b->pollsets, b->pollset_count, + "merge_a2b", a->fds, &a->fd_count), + err_desc); + append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, + a->pollsets, a->pollset_count, + "merge_b2a", a->fds, &a->fd_count), + err_desc); + if (a->pollset_capacity < a->pollset_count + b->pollset_count) { + a->pollset_capacity = + GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); + a->pollsets = (grpc_pollset **)gpr_realloc( + a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); + } + if (b->pollset_count > 0) { + memcpy(a->pollsets + a->pollset_count, b->pollsets, + b->pollset_count * sizeof(*b->pollsets)); + } + a->pollset_count += b->pollset_count; + gpr_free(b->fds); + gpr_free(b->pollsets); + b->fds = NULL; + b->pollsets = NULL; + b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0; + gpr_mu_unlock(&a->mu); + gpr_mu_unlock(&b->mu); } +static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + /******************************************************************************* * Event engine binding */ @@ -1399,7 +1427,7 @@ static const grpc_event_engine_vtable vtable = { pollset_add_fd, pollset_set_create, - pollset_set_destroy, + pollset_set_unref, // destroy ==> unref 1 public ref pollset_set_add_pollset, pollset_set_del_pollset, pollset_set_add_pollset_set, @@ -1412,6 +1440,10 @@ static const grpc_event_engine_vtable vtable = { const grpc_event_engine_vtable *grpc_init_epollex_linux( bool explicitly_requested) { + if (!explicitly_requested) { + return NULL; + } + if (!grpc_has_wakeup_fd()) { return NULL; } diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 45403c3037..28e4efa011 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -91,12 +91,9 @@ const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { } // namespace static const event_engine_factory g_factories[] = { - {"epoll1", grpc_init_epoll1_linux}, - {"epollsig", grpc_init_epollsig_linux}, - {"poll", grpc_init_poll_posix}, - {"poll-cv", grpc_init_poll_cv_posix}, - {"epollex", grpc_init_epollex_linux}, - {"none", init_non_polling}, + {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux}, + {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, + {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc index 6707f40f10..e4d956dbef 100644 --- a/src/core/lib/iomgr/exec_ctx.cc +++ b/src/core/lib/iomgr/exec_ctx.cc @@ -152,6 +152,15 @@ void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) { gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock_type) { + // special-case infinities as grpc_millis can be 32bit on some platforms + // while gpr_time_from_millis always takes an int64_t. + if (millis == GRPC_MILLIS_INF_FUTURE) { + return gpr_inf_future(clock_type); + } + if (millis == GRPC_MILLIS_INF_PAST) { + return gpr_inf_past(clock_type); + } + if (clock_type == GPR_TIMESPAN) { return gpr_time_from_millis(millis, GPR_TIMESPAN); } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index ae8d7b4724..cfdacaf988 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -186,7 +186,7 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); - p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + p = (backup_poller *)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } diff --git a/src/core/lib/security/credentials/fake/fake_credentials.cc b/src/core/lib/security/credentials/fake/fake_credentials.cc index 795ca0660a..cf10bf24c8 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.cc +++ b/src/core/lib/security/credentials/fake/fake_credentials.cc @@ -38,7 +38,8 @@ static grpc_security_status fake_transport_security_create_security_connector( grpc_call_credentials *call_creds, const char *target, const grpc_channel_args *args, grpc_channel_security_connector **sc, grpc_channel_args **new_args) { - *sc = grpc_fake_channel_security_connector_create(call_creds, target, args); + *sc = + grpc_fake_channel_security_connector_create(c, call_creds, target, args); return GRPC_SECURITY_OK; } @@ -46,7 +47,7 @@ static grpc_security_status fake_transport_security_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *c, grpc_server_security_connector **sc) { - *sc = grpc_fake_server_security_connector_create(); + *sc = grpc_fake_server_security_connector_create(c); return GRPC_SECURITY_OK; } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc index 8258bd7cd1..e58f4eed1c 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.cc @@ -262,7 +262,7 @@ static bool oauth2_token_fetcher_get_request_metadata( grpc_mdelem cached_access_token_md = GRPC_MDNULL; gpr_mu_lock(&c->mu); if (!GRPC_MDISNULL(c->access_token_md) && - (c->token_expiration + grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { + (c->token_expiration - grpc_exec_ctx_now(exec_ctx) > refresh_threshold)) { cached_access_token_md = GRPC_MDELEM_REF(c->access_token_md); } if (!GRPC_MDISNULL(cached_access_token_md)) { diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 9df69a2a3d..290336adc0 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -62,7 +62,8 @@ static grpc_security_status ssl_create_security_connector( } } status = grpc_ssl_channel_security_connector_create( - exec_ctx, call_creds, &c->config, target, overridden_target_name, sc); + exec_ctx, creds, call_creds, &c->config, target, overridden_target_name, + sc); if (status != GRPC_SECURITY_OK) { return status; } @@ -128,7 +129,8 @@ static grpc_security_status ssl_server_create_security_connector( grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds, grpc_server_security_connector **sc) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - return grpc_ssl_server_security_connector_create(exec_ctx, &c->config, sc); + return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config, + sc); } static grpc_server_credentials_vtable ssl_server_vtable = { diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 3c19d5e60a..31e2fd0107 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -136,6 +136,39 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, } } +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other) { + if (sc == NULL || other == NULL) return GPR_ICMP(sc, other); + int c = GPR_ICMP(sc->vtable, other->vtable); + if (c != 0) return c; + return sc->vtable->cmp(sc, other); +} + +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2) { + GPR_ASSERT(sc1->channel_creds != NULL); + GPR_ASSERT(sc2->channel_creds != NULL); + int c = GPR_ICMP(sc1->channel_creds, sc2->channel_creds); + if (c != 0) return c; + c = GPR_ICMP(sc1->request_metadata_creds, sc2->request_metadata_creds); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->check_call_host, (void *)sc2->check_call_host); + if (c != 0) return c; + c = GPR_ICMP((void *)sc1->cancel_check_call_host, + (void *)sc2->cancel_check_call_host); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2) { + GPR_ASSERT(sc1->server_creds != NULL); + GPR_ASSERT(sc2->server_creds != NULL); + int c = GPR_ICMP(sc1->server_creds, sc2->server_creds); + if (c != 0) return c; + return GPR_ICMP((void *)sc1->add_handshakers, (void *)sc2->add_handshakers); +} + bool grpc_channel_security_connector_check_call_host( grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, grpc_auth_context *auth_context, @@ -199,25 +232,27 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx, if (gpr_unref(&sc->refcount)) sc->vtable->destroy(exec_ctx, sc); } -static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { +static void connector_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, (grpc_security_connector *)p, - "connector_pointer_arg_destroy"); + "connector_arg_destroy"); } -static void *connector_pointer_arg_copy(void *p) { +static void *connector_arg_copy(void *p) { return GRPC_SECURITY_CONNECTOR_REF((grpc_security_connector *)p, - "connector_pointer_arg_copy"); + "connector_arg_copy"); } -static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); } +static int connector_cmp(void *a, void *b) { + return grpc_security_connector_cmp((grpc_security_connector *)a, + (grpc_security_connector *)b); +} -static const grpc_arg_pointer_vtable connector_pointer_vtable = { - connector_pointer_arg_copy, connector_pointer_arg_destroy, - connector_pointer_cmp}; +static const grpc_arg_pointer_vtable connector_arg_vtable = { + connector_arg_copy, connector_arg_destroy, connector_cmp}; grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc) { return grpc_channel_arg_pointer_create((char *)GRPC_ARG_SECURITY_CONNECTOR, - sc, &connector_pointer_vtable); + sc, &connector_arg_vtable); } grpc_security_connector *grpc_security_connector_from_arg(const grpc_arg *arg) { @@ -382,6 +417,32 @@ static void fake_server_check_peer(grpc_exec_ctx *exec_ctx, fake_check_peer(exec_ctx, sc, peer, auth_context, on_peer_checked); } +static int fake_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_fake_channel_security_connector *c1 = + (grpc_fake_channel_security_connector *)sc1; + grpc_fake_channel_security_connector *c2 = + (grpc_fake_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target, c2->target); + if (c != 0) return c; + if (c1->expected_targets == NULL || c2->expected_targets == NULL) { + c = GPR_ICMP(c1->expected_targets, c2->expected_targets); + } else { + c = strcmp(c1->expected_targets, c2->expected_targets); + } + if (c != 0) return c; + return GPR_ICMP(c1->is_lb_channel, c2->is_lb_channel); +} + +static int fake_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static bool fake_channel_check_call_host(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -418,12 +479,13 @@ static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx, } static grpc_security_connector_vtable fake_channel_vtable = { - fake_channel_destroy, fake_channel_check_peer}; + fake_channel_destroy, fake_channel_check_peer, fake_channel_cmp}; static grpc_security_connector_vtable fake_server_vtable = { - fake_server_destroy, fake_server_check_peer}; + fake_server_destroy, fake_server_check_peer, fake_server_cmp}; grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args) { grpc_fake_channel_security_connector *c = @@ -431,6 +493,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.base.vtable = &fake_channel_vtable; + c->base.channel_creds = channel_creds; c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = fake_channel_check_call_host; @@ -444,13 +507,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( } grpc_server_security_connector *grpc_fake_server_security_connector_create( - void) { + grpc_server_credentials *server_creds) { grpc_server_security_connector *c = (grpc_server_security_connector *)gpr_zalloc( sizeof(grpc_server_security_connector)); gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; + c->server_creds = server_creds; c->add_handshakers = fake_server_add_handshakers; return c; } @@ -473,6 +537,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_channel_security_connector *c = (grpc_ssl_channel_security_connector *)sc; + grpc_channel_credentials_unref(exec_ctx, c->base.channel_creds); grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); tsi_ssl_client_handshaker_factory_unref(c->client_handshaker_factory); c->client_handshaker_factory = NULL; @@ -485,6 +550,7 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc) { grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; + grpc_server_credentials_unref(exec_ctx, c->base.server_creds); tsi_ssl_server_handshaker_factory_unref(c->server_handshaker_factory); c->server_handshaker_factory = NULL; gpr_free(sc); @@ -641,6 +707,29 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); } +static int ssl_channel_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + grpc_ssl_channel_security_connector *c1 = + (grpc_ssl_channel_security_connector *)sc1; + grpc_ssl_channel_security_connector *c2 = + (grpc_ssl_channel_security_connector *)sc2; + int c = grpc_channel_security_connector_cmp(&c1->base, &c2->base); + if (c != 0) return c; + c = strcmp(c1->target_name, c2->target_name); + if (c != 0) return c; + return (c1->overridden_target_name == NULL || + c2->overridden_target_name == NULL) + ? GPR_ICMP(c1->overridden_target_name, c2->overridden_target_name) + : strcmp(c1->overridden_target_name, c2->overridden_target_name); +} + +static int ssl_server_cmp(grpc_security_connector *sc1, + grpc_security_connector *sc2) { + return grpc_server_security_connector_cmp( + (grpc_server_security_connector *)sc1, + (grpc_server_security_connector *)sc2); +} + static void add_shallow_auth_property_to_peer(tsi_peer *peer, const grpc_auth_property *prop, const char *tsi_prop_name) { @@ -717,10 +806,10 @@ static void ssl_channel_cancel_check_call_host( } static grpc_security_connector_vtable ssl_channel_vtable = { - ssl_channel_destroy, ssl_channel_check_peer}; + ssl_channel_destroy, ssl_channel_check_peer, ssl_channel_cmp}; static grpc_security_connector_vtable ssl_server_vtable = { - ssl_server_destroy, ssl_server_check_peer}; + ssl_server_destroy, ssl_server_check_peer, ssl_server_cmp}; /* returns a NULL terminated slice. */ static grpc_slice compute_default_pem_root_certs_once(void) { @@ -804,7 +893,8 @@ const char *grpc_get_default_ssl_roots(void) { } grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); @@ -840,6 +930,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &ssl_channel_vtable; c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; + c->base.channel_creds = grpc_channel_credentials_ref(channel_creds); c->base.request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->base.check_call_host = ssl_channel_check_call_host; @@ -874,8 +965,8 @@ error: } grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc) { + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); const char **alpn_protocol_strings = (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols); @@ -897,6 +988,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.base.vtable = &ssl_server_vtable; + c->base.server_creds = grpc_server_credentials_ref(server_creds); result = tsi_create_ssl_server_handshaker_factory_ex( config->pem_key_cert_pairs, config->num_key_cert_pairs, config->pem_root_certs, get_tsi_client_certificate_request_type( diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index e4ee2110eb..38b53a2b8e 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -60,13 +60,9 @@ typedef struct { void (*check_peer)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc, tsi_peer peer, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); + int (*cmp)(grpc_security_connector *sc, grpc_security_connector *other); } grpc_security_connector_vtable; -typedef struct grpc_security_connector_handshake_list { - void *handshake; - struct grpc_security_connector_handshake_list *next; -} grpc_security_connector_handshake_list; - struct grpc_security_connector { const grpc_security_connector_vtable *vtable; gpr_refcount refcount; @@ -104,6 +100,10 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked); +/* Compares two security connectors. */ +int grpc_security_connector_cmp(grpc_security_connector *sc, + grpc_security_connector *other); + /* Util to encapsulate the connector in a channel arg. */ grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc); @@ -116,13 +116,14 @@ grpc_security_connector *grpc_security_connector_find_in_args( /* --- channel_security_connector object. --- - A channel security connector object represents away to configure the + A channel security connector object represents a way to configure the underlying transport security mechanism on the client side. */ typedef struct grpc_channel_security_connector grpc_channel_security_connector; struct grpc_channel_security_connector { grpc_security_connector base; + grpc_channel_credentials *channel_creds; grpc_call_credentials *request_metadata_creds; bool (*check_call_host)(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, const char *host, @@ -138,6 +139,10 @@ struct grpc_channel_security_connector { grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_channel_security_connector_cmp(grpc_channel_security_connector *sc1, + grpc_channel_security_connector *sc2); + /// Checks that the host that will be set for a call is acceptable. /// Returns true if completed synchronously, in which case \a error will /// be set to indicate the result. Otherwise, \a on_call_host_checked @@ -161,18 +166,23 @@ void grpc_channel_security_connector_add_handshakers( /* --- server_security_connector object. --- - A server security connector object represents away to configure the + A server security connector object represents a way to configure the underlying transport security mechanism on the server side. */ typedef struct grpc_server_security_connector grpc_server_security_connector; struct grpc_server_security_connector { grpc_security_connector base; + grpc_server_credentials *server_creds; void (*add_handshakers)(grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); }; +/// A helper function for use in grpc_security_connector_cmp() implementations. +int grpc_server_security_connector_cmp(grpc_server_security_connector *sc1, + grpc_server_security_connector *sc2); + void grpc_server_security_connector_add_handshakers( grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_handshake_manager *handshake_mgr); @@ -182,13 +192,14 @@ void grpc_server_security_connector_add_handshakers( /* For TESTING ONLY! Creates a fake connector that emulates real channel security. */ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( + grpc_channel_credentials *channel_creds, grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args); /* For TESTING ONLY! Creates a fake connector that emulates real server security. */ grpc_server_security_connector *grpc_fake_server_security_connector_create( - void); + grpc_server_credentials *server_creds); /* Config for ssl clients. */ @@ -211,7 +222,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, grpc_call_credentials *request_metadata_creds, + grpc_exec_ctx *exec_ctx, grpc_channel_credentials *channel_creds, + grpc_call_credentials *request_metadata_creds, const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc); @@ -236,8 +248,8 @@ typedef struct { specific error code otherwise. */ grpc_security_status grpc_ssl_server_security_connector_create( - grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, - grpc_server_security_connector **sc); + grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds, + const grpc_ssl_server_config *config, grpc_server_security_connector **sc); /* Util. */ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer, diff --git a/src/core/lib/support/cpu_linux.cc b/src/core/lib/support/cpu_linux.cc index 2280668442..53619caa5f 100644 --- a/src/core/lib/support/cpu_linux.cc +++ b/src/core/lib/support/cpu_linux.cc @@ -38,8 +38,9 @@ static int ncpus = 0; static void init_num_cpus() { /* This must be signed. sysconf returns -1 when the number cannot be determined */ + int cpu = sched_getcpu(); ncpus = (int)sysconf(_SC_NPROCESSORS_ONLN); - if (ncpus < 1) { + if (ncpus < 1 || cpu < 0) { gpr_log(GPR_ERROR, "Cannot determine number of CPUs: assuming 1"); ncpus = 1; } @@ -56,6 +57,9 @@ unsigned gpr_cpu_current_cpu(void) { // sched_getcpu() is undefined on musl return 0; #else + if (gpr_cpu_num_cores() == 1) { + return 0; + } int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); diff --git a/src/core/lib/support/memory.h b/src/core/lib/support/memory.h index dc3d32e1c2..6b336681db 100644 --- a/src/core/lib/support/memory.h +++ b/src/core/lib/support/memory.h @@ -21,6 +21,7 @@ #include <grpc/support/alloc.h> +#include <limits> #include <memory> #include <utility> @@ -54,6 +55,46 @@ inline UniquePtr<T> MakeUnique(Args&&... args) { return UniquePtr<T>(New<T>(std::forward<Args>(args)...)); } +// an allocator that uses gpr_malloc/gpr_free +template <class T> +class Allocator { + public: + typedef T value_type; + typedef T* pointer; + typedef const T* const_pointer; + typedef T& reference; + typedef const T& const_reference; + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + typedef std::false_type propagate_on_container_move_assignment; + template <class U> + struct rebind { + typedef Allocator<U> other; + }; + typedef std::true_type is_always_equal; + + pointer address(reference x) const { return &x; } + const_pointer address(const_reference x) const { return &x; } + pointer allocate(std::size_t n, + std::allocator<void>::const_pointer hint = 0) { + return static_cast<pointer>(gpr_malloc(n * sizeof(T))); + } + void deallocate(T* p, std::size_t n) { gpr_free(p); } + size_t max_size() const { + return std::numeric_limits<size_type>::max() / sizeof(value_type); + } + void construct(pointer p, const_reference val) { new ((void*)p) T(val); } + template <class U, class... Args> + void construct(U* p, Args&&... args) { + ::new ((void*)p) U(std::forward<Args>(args)...); + } + void destroy(pointer p) { p->~T(); } + template <class U> + void destroy(U* p) { + p->~U(); + } +}; + } // namespace grpc_core #endif /* GRPC_CORE_LIB_SUPPORT_MEMORY_H */ diff --git a/src/core/lib/support/vector.h b/src/core/lib/support/vector.h new file mode 100644 index 0000000000..4a7db80676 --- /dev/null +++ b/src/core/lib/support/vector.h @@ -0,0 +1,32 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_VECTOR_H +#define GRPC_CORE_LIB_SUPPORT_VECTOR_H + +#include "absl/container/inlined_vector.h" +#include "src/core/lib/support/memory.h" + +namespace grpc_core { + +template <typename T, size_t N> +using InlinedVector = absl::InlinedVector<T, N, Allocator<T>>; + +} // namespace grpc_core + +#endif diff --git a/src/core/lib/transport/bdp_estimator.cc b/src/core/lib/transport/bdp_estimator.cc index 6eb2f54864..36826f45ca 100644 --- a/src/core/lib/transport/bdp_estimator.cc +++ b/src/core/lib/transport/bdp_estimator.cc @@ -32,13 +32,12 @@ BdpEstimator::BdpEstimator(const char *name) accumulator_(0), estimate_(65536), ping_start_time_(gpr_time_0(GPR_CLOCK_MONOTONIC)), - next_ping_scheduled_(0), inter_ping_delay_(100.0), // start at 100ms stable_estimate_count_(0), bw_est_(0), name_(name) {} -void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { +grpc_millis BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec dt_ts = gpr_time_sub(now, ping_start_time_); double dt = (double)dt_ts.tv_sec + 1e-9 * (double)dt_ts.tv_nsec; @@ -78,7 +77,7 @@ void BdpEstimator::CompletePing(grpc_exec_ctx *exec_ctx) { } ping_state_ = PingState::UNSCHEDULED; accumulator_ = 0; - next_ping_scheduled_ = grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; + return grpc_exec_ctx_now(exec_ctx) + inter_ping_delay_; } } // namespace grpc_core diff --git a/src/core/lib/transport/bdp_estimator.h b/src/core/lib/transport/bdp_estimator.h index 681595c34b..3558eb49e7 100644 --- a/src/core/lib/transport/bdp_estimator.h +++ b/src/core/lib/transport/bdp_estimator.h @@ -40,30 +40,11 @@ class BdpEstimator { explicit BdpEstimator(const char *name); ~BdpEstimator() {} - // Returns true if a reasonable estimate could be obtained - bool EstimateBdp(int64_t *estimate_out) const { - *estimate_out = estimate_; - return true; - } - bool EstimateBandwidth(double *bw_out) const { - *bw_out = bw_est_; - return true; - } + int64_t EstimateBdp() const { return estimate_; } + double EstimateBandwidth() const { return bw_est_; } void AddIncomingBytes(int64_t num_bytes) { accumulator_ += num_bytes; } - // Returns true if the user should schedule a ping - bool NeedPing(grpc_exec_ctx *exec_ctx) const { - switch (ping_state_) { - case PingState::UNSCHEDULED: - return grpc_exec_ctx_now(exec_ctx) >= next_ping_scheduled_; - case PingState::SCHEDULED: - case PingState::STARTED: - return false; - } - GPR_UNREACHABLE_CODE(return false); - } - // Schedule a ping: call in response to receiving a true from // grpc_bdp_estimator_add_incoming_bytes once a ping has been scheduled by a // transport (but not necessarily started) @@ -91,8 +72,8 @@ class BdpEstimator { ping_start_time_ = gpr_now(GPR_CLOCK_MONOTONIC); } - // Completes a previously started ping - void CompletePing(grpc_exec_ctx *exec_ctx); + // Completes a previously started ping, returns when to schedule the next one + grpc_millis CompletePing(grpc_exec_ctx *exec_ctx); private: enum class PingState { UNSCHEDULED, SCHEDULED, STARTED }; @@ -102,8 +83,6 @@ class BdpEstimator { int64_t estimate_; // when was the current ping started? gpr_timespec ping_start_time_; - // when should the next ping start? - grpc_millis next_ping_scheduled_; int inter_ping_delay_; int stable_estimate_count_; double bw_est_; diff --git a/src/core/lib/transport/metadata.cc b/src/core/lib/transport/metadata.cc index 99eccba34e..1eddb6f0d6 100644 --- a/src/core/lib/transport/metadata.cc +++ b/src/core/lib/transport/metadata.cc @@ -351,11 +351,14 @@ static size_t get_base64_encoded_size(size_t raw_length) { return raw_length / 3 * 4 + tail_xtra[raw_length % 3]; } -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem) { +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata) { size_t overhead_and_key = 32 + GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); size_t value_len = GRPC_SLICE_LENGTH(GRPC_MDVALUE(elem)); if (grpc_is_binary_header(GRPC_MDKEY(elem))) { - return overhead_and_key + get_base64_encoded_size(value_len); + return overhead_and_key + (use_true_binary_metadata + ? value_len + 1 + : get_base64_encoded_size(value_len)); } else { return overhead_and_key + value_len; } diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index a70ebc4921..66780d925b 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -132,7 +132,8 @@ grpc_mdelem grpc_mdelem_create( bool grpc_mdelem_eq(grpc_mdelem a, grpc_mdelem b); -size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem); +size_t grpc_mdelem_get_size_in_hpack_table(grpc_mdelem elem, + bool use_true_binary_metadata); /* Mutator and accessor for grpc_mdelem user data. The destructor function is used as a type tag and is checked during user_data fetch. */ diff --git a/src/core/lib/transport/pid_controller.cc b/src/core/lib/transport/pid_controller.cc index 4b304f17b2..9f7750d693 100644 --- a/src/core/lib/transport/pid_controller.cc +++ b/src/core/lib/transport/pid_controller.cc @@ -19,45 +19,30 @@ #include "src/core/lib/transport/pid_controller.h" #include <grpc/support/useful.h> -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args) { - pid_controller->args = args; - pid_controller->last_control_value = args.initial_control_value; - grpc_pid_controller_reset(pid_controller); -} +namespace grpc_core { -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) { - pid_controller->last_error = 0.0; - pid_controller->last_dc_dt = 0.0; - pid_controller->error_integral = 0.0; -} +PidController::PidController(const Args &args) + : last_control_value_(args.initial_control_value()), args_(args) {} -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt) { - if (dt == 0) return pid_controller->last_control_value; +double PidController::Update(double error, double dt) { + if (dt <= 0) return last_control_value_; /* integrate error using the trapezoid rule */ - pid_controller->error_integral += - dt * (pid_controller->last_error + error) * 0.5; - pid_controller->error_integral = GPR_CLAMP( - pid_controller->error_integral, -pid_controller->args.integral_range, - pid_controller->args.integral_range); - double diff_error = (error - pid_controller->last_error) / dt; + error_integral_ += dt * (last_error_ + error) * 0.5; + error_integral_ = GPR_CLAMP(error_integral_, -args_.integral_range(), + args_.integral_range()); + double diff_error = (error - last_error_) / dt; /* calculate derivative of control value vs time */ - double dc_dt = pid_controller->args.gain_p * error + - pid_controller->args.gain_i * pid_controller->error_integral + - pid_controller->args.gain_d * diff_error; + double dc_dt = args_.gain_p() * error + args_.gain_i() * error_integral_ + + args_.gain_d() * diff_error; /* and perform trapezoidal integration */ - double new_control_value = pid_controller->last_control_value + - dt * (pid_controller->last_dc_dt + dc_dt) * 0.5; - new_control_value = - GPR_CLAMP(new_control_value, pid_controller->args.min_control_value, - pid_controller->args.max_control_value); - pid_controller->last_error = error; - pid_controller->last_dc_dt = dc_dt; - pid_controller->last_control_value = new_control_value; + double new_control_value = + last_control_value_ + dt * (last_dc_dt_ + dc_dt) * 0.5; + new_control_value = GPR_CLAMP(new_control_value, args_.min_control_value(), + args_.max_control_value()); + last_error_ = error; + last_dc_dt_ = dc_dt; + last_control_value_ = new_control_value; return new_control_value; } -double grpc_pid_controller_last(grpc_pid_controller *pid_controller) { - return pid_controller->last_control_value; -} +} // namespace grpc_core diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h index 80899e9a20..87e59a1a90 100644 --- a/src/core/lib/transport/pid_controller.h +++ b/src/core/lib/transport/pid_controller.h @@ -19,9 +19,7 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H #define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H -#ifdef __cplusplus -extern "C" { -#endif +#include <limits> /* \file Simple PID controller. Implements a proportional-integral-derivative controller. @@ -30,41 +28,87 @@ extern "C" { Gains can be set to adjust sensitivity to current error (p), the integral of error (i), and the derivative of error (d). */ -typedef struct { - double gain_p; - double gain_i; - double gain_d; - double initial_control_value; - double min_control_value; - double max_control_value; - double integral_range; -} grpc_pid_controller_args; +namespace grpc_core { -typedef struct { - double last_error; - double error_integral; - double last_control_value; - double last_dc_dt; - grpc_pid_controller_args args; -} grpc_pid_controller; +class PidController { + public: + class Args { + public: + double gain_p() const { return gain_p_; } + double gain_i() const { return gain_i_; } + double gain_d() const { return gain_d_; } + double initial_control_value() const { return initial_control_value_; } + double min_control_value() const { return min_control_value_; } + double max_control_value() const { return max_control_value_; } + double integral_range() const { return integral_range_; } -/** Initialize the controller */ -void grpc_pid_controller_init(grpc_pid_controller *pid_controller, - grpc_pid_controller_args args); + Args& set_gain_p(double gain_p) { + gain_p_ = gain_p; + return *this; + } + Args& set_gain_i(double gain_i) { + gain_i_ = gain_i; + return *this; + } + Args& set_gain_d(double gain_d) { + gain_d_ = gain_d; + return *this; + } + Args& set_initial_control_value(double initial_control_value) { + initial_control_value_ = initial_control_value; + return *this; + } + Args& set_min_control_value(double min_control_value) { + min_control_value_ = min_control_value; + return *this; + } + Args& set_max_control_value(double max_control_value) { + max_control_value_ = max_control_value; + return *this; + } + Args& set_integral_range(double integral_range) { + integral_range_ = integral_range; + return *this; + } -/** Reset the controller: useful when things have changed significantly */ -void grpc_pid_controller_reset(grpc_pid_controller *pid_controller); + private: + double gain_p_ = 0.0; + double gain_i_ = 0.0; + double gain_d_ = 0.0; + double initial_control_value_ = 0.0; + double min_control_value_ = std::numeric_limits<double>::min(); + double max_control_value_ = std::numeric_limits<double>::max(); + double integral_range_ = std::numeric_limits<double>::max(); + }; -/** Update the controller: given a current error estimate, and the time since - the last update, returns a new control value */ -double grpc_pid_controller_update(grpc_pid_controller *pid_controller, - double error, double dt); + explicit PidController(const Args& args); -/** Returns the last control value calculated */ -double grpc_pid_controller_last(grpc_pid_controller *pid_controller); + /// Reset the controller internal state: useful when the environment has + /// changed significantly + void Reset() { + last_error_ = 0.0; + last_dc_dt_ = 0.0; + error_integral_ = 0.0; + } -#ifdef __cplusplus -} -#endif + /// Update the controller: given a current error estimate, and the time since + /// the last update, returns a new control value + double Update(double error, double dt); + + /// Returns the last control value calculated + double last_control_value() const { return last_control_value_; } + + /// Returns the current error integral (mostly for testing) + double error_integral() const { return error_integral_; } + + private: + double last_error_ = 0.0; + double error_integral_ = 0.0; + double last_control_value_; + double last_dc_dt_ = 0.0; + const Args args_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H */ |