aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-18 14:47:18 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-10-18 14:47:18 -0700
commitd3d709a10fe2d3d9c2f2115bd4adad33e5a1ad0c (patch)
tree4a72ee6263f95fc39db944aab3699bc9f15922b8 /src/core/lib/iomgr
parentcdaf6d8e0660f83b708e4e73e90a4deaeab7ea2b (diff)
parentdcd7e80fdaf29b3fd081e7ac9df708d4c562eb79 (diff)
Merge github.com:grpc/grpc into tracer
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc1404
-rw-r--r--src/core/lib/iomgr/ev_posix.cc9
-rw-r--r--src/core/lib/iomgr/exec_ctx.cc9
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
4 files changed, 731 insertions, 693 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 58e5392c47..d30ca3fd5e 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -30,6 +30,7 @@
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
+#include <sys/syscall.h>
#include <unistd.h>
#include <grpc/support/alloc.h>
@@ -49,100 +50,96 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/spinlock.h"
-/*******************************************************************************
- * Polling object
- */
-typedef enum {
- PO_POLLING_GROUP,
- PO_POLLSET_SET,
- PO_POLLSET,
- PO_FD,
- /* ordering is important: we always want to lock pollsets before fds:
- this guarantees that using an fd as a pollable is safe */
- PO_EMPTY_POLLABLE,
- PO_COUNT
-} polling_obj_type;
-
-typedef struct polling_obj polling_obj;
-typedef struct polling_group polling_group;
-
-struct polling_obj {
- gpr_mu mu;
- polling_obj_type type;
- polling_group *group;
- struct polling_obj *next;
- struct polling_obj *prev;
-};
-
-struct polling_group {
- polling_obj po;
- gpr_refcount refs;
-};
+// debug aid: create workers on the heap (allows asan to spot
+// use-after-destruction)
+//#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1
-static void po_init(polling_obj *po, polling_obj_type type);
-static void po_destroy(polling_obj *po);
-static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b);
-static int po_cmp(polling_obj *a, polling_obj *b);
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
-static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
- size_t initial_po_count);
-static polling_group *pg_ref(polling_group *pg);
-static void pg_unref(polling_group *pg);
-static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
- polling_group *b);
-static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
- polling_obj *po);
+#ifndef NDEBUG
+grpc_core::Tracer grpc_trace_pollable_refcount(false, "pollable_refcount");
+#endif
/*******************************************************************************
* pollable Declarations
*/
-typedef struct pollable {
- polling_obj po;
+typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
+
+typedef struct pollable pollable;
+
+/// A pollable is something that can be polled: it has an epoll set to poll on,
+/// and a wakeup fd for kicks
+/// There are three broad types:
+/// - PO_EMPTY - the empty pollable, used before file descriptors are added to
+/// a pollset
+/// - PO_FD - a pollable containing only one FD - used to optimize single-fd
+/// pollsets (which are common with synchronous api usage)
+/// - PO_MULTI - a pollable containing many fds
+struct pollable {
+ pollable_type type; // immutable
+ gpr_refcount refs;
+
int epfd;
grpc_wakeup_fd wakeup;
+
+ // only for type fd... one ref to the owner fd
+ grpc_fd *owner_fd;
+
+ grpc_pollset_set *pollset_set;
+ pollable *next;
+ pollable *prev;
+
+ gpr_mu mu;
grpc_pollset_worker *root_worker;
-} pollable;
-static const char *polling_obj_type_string(polling_obj_type t) {
+ int event_cursor;
+ int event_count;
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+};
+
+static const char *pollable_type_string(pollable_type t) {
switch (t) {
- case PO_POLLING_GROUP:
- return "polling_group";
- case PO_POLLSET_SET:
- return "pollset_set";
- case PO_POLLSET:
+ case PO_MULTI:
return "pollset";
case PO_FD:
return "fd";
- case PO_EMPTY_POLLABLE:
- return "empty_pollable";
- case PO_COUNT:
- return "<invalid:count>";
+ case PO_EMPTY:
+ return "empty";
}
return "<invalid>";
}
static char *pollable_desc(pollable *p) {
char *out;
- gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d",
- polling_obj_type_string(p->po.type), p->po.group, p->epfd,
- p->wakeup.read_fd);
+ gpr_asprintf(&out, "type=%s epfd=%d wakeup=%d", pollable_type_string(p->type),
+ p->epfd, p->wakeup.read_fd);
return out;
}
-static pollable g_empty_pollable;
+/// Shared empty pollable - used by pollset to poll on until the first fd is
+/// added
+static pollable *g_empty_pollable;
-static void pollable_init(pollable *p, polling_obj_type type);
-static void pollable_destroy(pollable *p);
-/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p);
+static grpc_error *pollable_create(pollable_type type, pollable **p);
+#ifdef NDEBUG
+static pollable *pollable_ref(pollable *p);
+static void pollable_unref(pollable *p);
+#define POLLABLE_REF(p, r) pollable_ref(p)
+#define POLLABLE_UNREF(p, r) pollable_unref(p)
+#else
+static pollable *pollable_ref(pollable *p, int line, const char *reason);
+static void pollable_unref(pollable *p, int line, const char *reason);
+#define POLLABLE_REF(p, r) pollable_ref((p), __LINE__, (r))
+#define POLLABLE_UNREF(p, r) pollable_unref((p), __LINE__, (r))
+#endif
/*******************************************************************************
* Fd Declarations
*/
struct grpc_fd {
- pollable pollable_obj;
int fd;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
@@ -150,11 +147,10 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
- /* The fd is either closed or we relinquished control of it. In either
- cases, this indicates that the 'fd' on this structure is no longer
- valid */
- gpr_mu orphaned_mu;
- bool orphaned;
+ gpr_mu orphan_mu;
+
+ gpr_mu pollable_mu;
+ pollable *pollable_obj;
gpr_atm read_closure;
gpr_atm write_closure;
@@ -176,47 +172,52 @@ static void fd_global_shutdown(void);
* Pollset Declarations
*/
-typedef struct pollset_worker_link {
+typedef struct {
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
-} pollset_worker_link;
+} pwlink;
-typedef enum {
- PWL_POLLSET,
- PWL_POLLABLE,
- POLLSET_WORKER_LINK_COUNT
-} pollset_worker_links;
+typedef enum { PWLINK_POLLABLE = 0, PWLINK_POLLSET, PWLINK_COUNT } pwlinks;
struct grpc_pollset_worker {
bool kicked;
bool initialized_cv;
- pollset_worker_link links[POLLSET_WORKER_LINK_COUNT];
+#ifndef NDEBUG
+ // debug aid: which thread started this worker
+ pid_t originator;
+#endif
gpr_cv cv;
grpc_pollset *pollset;
pollable *pollable_obj;
-};
-#define MAX_EPOLL_EVENTS 100
-#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5
+ pwlink links[PWLINK_COUNT];
+};
struct grpc_pollset {
- pollable pollable_obj;
- pollable *current_pollable_obj;
- int kick_alls_pending;
+ gpr_mu mu;
+ pollable *active_pollable;
bool kicked_without_poller;
grpc_closure *shutdown_closure;
grpc_pollset_worker *root_worker;
-
- int event_cursor;
- int event_count;
- struct epoll_event events[MAX_EPOLL_EVENTS];
+ int containing_pollset_set_count;
};
/*******************************************************************************
* Pollset-set Declarations
*/
+
struct grpc_pollset_set {
- polling_obj po;
+ gpr_refcount refs;
+ gpr_mu mu;
+ grpc_pollset_set *parent;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
};
/*******************************************************************************
@@ -250,11 +251,6 @@ static bool append_error(grpc_error **composite, grpc_error *error,
* becomes a spurious read notification on a reused fd.
*/
-/* The alarm system needs to be able to wakeup 'some poller' sometimes
- * (specifically when a new alarm needs to be triggered earlier than the next
- * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
- * case occurs. */
-
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
@@ -282,8 +278,9 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_fd *fd = (grpc_fd *)arg;
/* Add the fd to the freelist */
grpc_iomgr_unregister_object(&fd->iomgr_object);
- pollable_destroy(&fd->pollable_obj);
- gpr_mu_destroy(&fd->orphaned_mu);
+ POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
+ gpr_mu_destroy(&fd->pollable_mu);
+ gpr_mu_destroy(&fd->orphan_mu);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -343,12 +340,11 @@ static grpc_fd *fd_create(int fd, const char *name) {
new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd));
}
- pollable_init(&new_fd->pollable_obj, PO_FD);
-
+ gpr_mu_init(&new_fd->pollable_mu);
+ gpr_mu_init(&new_fd->orphan_mu);
+ new_fd->pollable_obj = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
- gpr_mu_init(&new_fd->orphaned_mu);
- new_fd->orphaned = false;
grpc_lfev_init(&new_fd->read_closure);
grpc_lfev_init(&new_fd->write_closure);
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
@@ -369,24 +365,17 @@ static grpc_fd *fd_create(int fd, const char *name) {
}
static int fd_wrapped_fd(grpc_fd *fd) {
- int ret_fd = -1;
- gpr_mu_lock(&fd->orphaned_mu);
- if (!fd->orphaned) {
- ret_fd = fd->fd;
- }
- gpr_mu_unlock(&fd->orphaned_mu);
-
- return ret_fd;
+ int ret_fd = fd->fd;
+ return (gpr_atm_acq_load(&fd->refst) & 1) ? ret_fd : -1;
}
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
bool already_closed, const char *reason) {
bool is_fd_closed = already_closed;
- grpc_error *error = GRPC_ERROR_NONE;
- gpr_mu_lock(&fd->pollable_obj.po.mu);
- gpr_mu_lock(&fd->orphaned_mu);
+ gpr_mu_lock(&fd->orphan_mu);
+
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -398,8 +387,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
is_fd_closed = true;
}
- fd->orphaned = true;
-
if (!is_fd_closed) {
gpr_log(GPR_DEBUG, "TODO: handle fd removal?");
}
@@ -408,13 +395,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
- GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
+
+ gpr_mu_unlock(&fd->orphan_mu);
- gpr_mu_unlock(&fd->orphaned_mu);
- gpr_mu_unlock(&fd->pollable_obj.po.mu);
UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
- GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
- GRPC_ERROR_UNREF(error);
}
static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
@@ -451,63 +436,87 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
* Pollable Definitions
*/
-static void pollable_init(pollable *p, polling_obj_type type) {
- po_init(&p->po, type);
- p->root_worker = NULL;
- p->epfd = -1;
+static grpc_error *pollable_create(pollable_type type, pollable **p) {
+ *p = NULL;
+
+ int epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (epfd == -1) {
+ return GRPC_OS_ERROR(errno, "epoll_create1");
+ }
+ *p = (pollable *)gpr_malloc(sizeof(**p));
+ grpc_error *err = grpc_wakeup_fd_init(&(*p)->wakeup);
+ if (err != GRPC_ERROR_NONE) {
+ close(epfd);
+ gpr_free(*p);
+ *p = NULL;
+ return err;
+ }
+ struct epoll_event ev;
+ ev.events = (uint32_t)(EPOLLIN | EPOLLET);
+ ev.data.ptr = (void *)(1 | (intptr_t) & (*p)->wakeup);
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, (*p)->wakeup.read_fd, &ev) != 0) {
+ err = GRPC_OS_ERROR(errno, "epoll_ctl");
+ close(epfd);
+ grpc_wakeup_fd_destroy(&(*p)->wakeup);
+ gpr_free(*p);
+ *p = NULL;
+ return err;
+ }
+
+ (*p)->type = type;
+ gpr_ref_init(&(*p)->refs, 1);
+ gpr_mu_init(&(*p)->mu);
+ (*p)->epfd = epfd;
+ (*p)->owner_fd = NULL;
+ (*p)->pollset_set = NULL;
+ (*p)->next = (*p)->prev = *p;
+ (*p)->root_worker = NULL;
+ (*p)->event_cursor = 0;
+ (*p)->event_count = 0;
+ return GRPC_ERROR_NONE;
}
-static void pollable_destroy(pollable *p) {
- po_destroy(&p->po);
- if (p->epfd != -1) {
- close(p->epfd);
- grpc_wakeup_fd_destroy(&p->wakeup);
+#ifdef NDEBUG
+static pollable *pollable_ref(pollable *p) {
+#else
+static pollable *pollable_ref(pollable *p, int line, const char *reason) {
+ if (grpc_trace_pollable_refcount.enabled()) {
+ int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
+ gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
+ "POLLABLE:%p ref %d->%d %s", p, r, r + 1, reason);
}
+#endif
+ gpr_ref(&p->refs);
+ return p;
}
-/* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */
-static grpc_error *pollable_materialize(pollable *p) {
- if (p->epfd == -1) {
- int new_epfd = epoll_create1(EPOLL_CLOEXEC);
- if (new_epfd < 0) {
- return GRPC_OS_ERROR(errno, "epoll_create1");
- }
- grpc_error *err = grpc_wakeup_fd_init(&p->wakeup);
- if (err != GRPC_ERROR_NONE) {
- close(new_epfd);
- return err;
- }
- struct epoll_event ev;
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = (void *)(1 | (intptr_t)&p->wakeup);
- if (epoll_ctl(new_epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
- err = GRPC_OS_ERROR(errno, "epoll_ctl");
- close(new_epfd);
- grpc_wakeup_fd_destroy(&p->wakeup);
- return err;
- }
-
- p->epfd = new_epfd;
+#ifdef NDEBUG
+static void pollable_unref(pollable *p) {
+#else
+static void pollable_unref(pollable *p, int line, const char *reason) {
+ if (p == NULL) return;
+ if (grpc_trace_pollable_refcount.enabled()) {
+ int r = (int)gpr_atm_no_barrier_load(&p->refs.count);
+ gpr_log(__FILE__, line, GPR_LOG_SEVERITY_DEBUG,
+ "POLLABLE:%p unref %d->%d %s", p, r, r - 1, reason);
+ }
+#endif
+ if (p != NULL && gpr_unref(&p->refs)) {
+ close(p->epfd);
+ grpc_wakeup_fd_destroy(&p->wakeup);
+ gpr_free(p);
}
- return GRPC_ERROR_NONE;
}
-/* pollable must be materialized */
static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollable_add_fd";
const int epfd = p->epfd;
- GPR_ASSERT(epfd != -1);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
- gpr_mu_lock(&fd->orphaned_mu);
- if (fd->orphaned) {
- gpr_mu_unlock(&fd->orphaned_mu);
- return GRPC_ERROR_NONE;
- }
struct epoll_event ev_fd;
ev_fd.events = (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
ev_fd.data.ptr = fd;
@@ -519,7 +528,6 @@ static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) {
append_error(&error, GRPC_OS_ERROR(errno, "epoll_ctl"), err_desc);
}
}
- gpr_mu_unlock(&fd->orphaned_mu);
return error;
}
@@ -535,128 +543,67 @@ GPR_TLS_DECL(g_current_thread_worker);
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
- pollable_init(&g_empty_pollable, PO_EMPTY_POLLABLE);
- return GRPC_ERROR_NONE;
+ return pollable_create(PO_EMPTY, &g_empty_pollable);
}
static void pollset_global_shutdown(void) {
- pollable_destroy(&g_empty_pollable);
+ POLLABLE_UNREF(g_empty_pollable, "g_empty_pollable");
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
+/* pollset->mu must be held while calling this function */
static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) "
+ "rw=%p (target:NULL) cpsc=%d (target:0)",
+ pollset, pollset->active_pollable, pollset->shutdown_closure,
+ pollset->root_worker, pollset->containing_pollset_set_count);
+ }
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
- pollset->kick_alls_pending == 0) {
+ pollset->containing_pollset_set_count == 0) {
GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
-static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error_unused) {
- grpc_error *error = GRPC_ERROR_NONE;
- grpc_pollset *pollset = (grpc_pollset *)arg;
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- if (pollset->root_worker != NULL) {
- grpc_pollset_worker *worker = pollset->root_worker;
- do {
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&worker->pollable_obj->po.mu);
- }
- if (worker->initialized_cv && worker != pollset->root_worker) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- worker->kicked = true;
- gpr_cv_signal(&worker->cv);
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)",
- pollset, worker, &pollset->pollable_obj,
- worker->pollable_obj);
- }
- append_error(&error,
- grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup),
- "pollset_shutdown");
- }
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker->pollable_obj->po.mu);
- }
-
- worker = worker->links[PWL_POLLSET].next;
- } while (worker != pollset->root_worker);
- }
- pollset->kick_alls_pending--;
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- GRPC_LOG_IF_ERROR("kick_all", error);
-}
-
-static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- pollset->kick_alls_pending++;
- GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(do_kick_all, pollset,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
-}
-
-static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
- grpc_pollset_worker *specific_worker) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p kick %p tls_pollset=%p tls_worker=%p "
- "root_worker=(pollset:%p pollable:%p)",
- p, specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
- (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker,
- p->root_worker);
- }
- if (specific_worker == NULL) {
- if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
- if (pollset->root_worker == NULL) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", p);
- }
- pollset->kicked_without_poller = true;
- return GRPC_ERROR_NONE;
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_via_wakeup_fd", p);
- }
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
- }
- } else {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", p);
- }
- return GRPC_ERROR_NONE;
- }
- } else if (specific_worker->kicked) {
+/* pollset->mu must be held before calling this function,
+ * pollset->active_pollable->mu & specific_worker->pollable_obj->mu must not be
+ * held */
+static grpc_error *kick_one_worker(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_worker *specific_worker) {
+ pollable *p = specific_worker->pollable_obj;
+ grpc_core::mu_guard lock(&p->mu);
+ GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
+ GPR_ASSERT(specific_worker != NULL);
+ if (specific_worker->kicked) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_already_kicked", p);
}
+ GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
return GRPC_ERROR_NONE;
- } else if (gpr_tls_get(&g_current_thread_worker) ==
- (intptr_t)specific_worker) {
+ }
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_but_awake", p);
}
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
specific_worker->kicked = true;
return GRPC_ERROR_NONE;
- } else if (specific_worker == p->root_worker) {
+ }
+ if (specific_worker == p->root_worker) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD(exec_ctx);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_wakeup_fd", p);
}
- grpc_error *err = pollable_materialize(p);
- if (err != GRPC_ERROR_NONE) return err;
specific_worker->kicked = true;
- return grpc_wakeup_fd_wakeup(&p->wakeup);
- } else {
+ grpc_error *error = grpc_wakeup_fd_wakeup(&p->wakeup);
+ return error;
+ }
+ if (specific_worker->initialized_cv) {
+ GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV(exec_ctx);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p kicked_specific_via_cv", p);
}
@@ -664,30 +611,79 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p,
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
}
+ // we can get here during end_worker after removing specific_worker from the
+ // pollable list but before removing it from the pollset list
+ return GRPC_ERROR_NONE;
}
-/* p->po.mu must be held before calling this function */
static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
- pollable *p = pollset->current_pollable_obj;
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (p != &pollset->pollable_obj) {
- gpr_mu_lock(&p->po.mu);
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p kick %p tls_pollset=%p tls_worker=%p pollset.root_worker=%p",
+ pollset, specific_worker,
+ (void *)gpr_tls_get(&g_current_thread_pollset),
+ (void *)gpr_tls_get(&g_current_thread_worker),
+ pollset->root_worker);
+ }
+ if (specific_worker == NULL) {
+ if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
+ if (pollset->root_worker == NULL) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_any_without_poller", pollset);
+ }
+ GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER(exec_ctx);
+ pollset->kicked_without_poller = true;
+ return GRPC_ERROR_NONE;
+ } else {
+ // We've been asked to kick a poller, but we haven't been told which one
+ // ... any will do
+ // We look at the pollset worker list because:
+ // 1. the pollable list may include workers from other pollers, so we'd
+ // need to do an O(N) search
+ // 2. we'd additionally need to take the pollable lock, which we've so
+ // far avoided
+ // Now, we would prefer to wake a poller in cv_wait, and not in
+ // epoll_wait (since the latter would imply the need to do an additional
+ // wakeup)
+ // We know that if a worker is at the root of a pollable, it's (likely)
+ // also the root of a pollset, and we know that if a worker is NOT at
+ // the root of a pollset, it's (likely) not at the root of a pollable,
+ // so we take our chances and choose the SECOND worker enqueued against
+ // the pollset as a worker that's likely to be in cv_wait
+ return kick_one_worker(
+ exec_ctx, pollset->root_worker->links[PWLINK_POLLSET].next);
+ }
+ } else {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PS:%p kicked_any_but_awake", pollset);
+ }
+ GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD(exec_ctx);
+ return GRPC_ERROR_NONE;
+ }
+ } else {
+ return kick_one_worker(exec_ctx, specific_worker);
}
- grpc_error *error = pollset_kick_inner(pollset, p, specific_worker);
- if (p != &pollset->pollable_obj) {
- gpr_mu_unlock(&p->po.mu);
+}
+
+static grpc_error *pollset_kick_all(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ const char *err_desc = "pollset_kick_all";
+ grpc_pollset_worker *w = pollset->root_worker;
+ if (w != NULL) {
+ do {
+ append_error(&error, kick_one_worker(exec_ctx, w), err_desc);
+ w = w->links[PWLINK_POLLSET].next;
+ } while (w != pollset->root_worker);
}
return error;
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- pollable_init(&pollset->pollable_obj, PO_POLLSET);
- pollset->current_pollable_obj = &g_empty_pollable;
- pollset->kicked_without_poller = false;
- pollset->shutdown_closure = NULL;
- pollset->root_worker = NULL;
- *mu = &pollset->pollable_obj.po.mu;
+ gpr_mu_init(&pollset->mu);
+ pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset");
+ *mu = &pollset->mu;
}
static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx,
@@ -719,12 +715,29 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
-static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
+static grpc_error *fd_get_or_become_pollable(grpc_fd *fd, pollable **p) {
+ gpr_mu_lock(&fd->pollable_mu);
grpc_error *error = GRPC_ERROR_NONE;
- static const char *err_desc = "fd_become_pollable";
- if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) {
- append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc);
+ static const char *err_desc = "fd_get_or_become_pollable";
+ if (fd->pollable_obj == NULL) {
+ if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj),
+ err_desc)) {
+ fd->pollable_obj->owner_fd = fd;
+ if (!append_error(&error, pollable_add_fd(fd->pollable_obj, fd),
+ err_desc)) {
+ POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
+ fd->pollable_obj = NULL;
+ }
+ }
}
+ if (error == GRPC_ERROR_NONE) {
+ GPR_ASSERT(fd->pollable_obj != NULL);
+ *p = POLLABLE_REF(fd->pollable_obj, "pollset");
+ } else {
+ GPR_ASSERT(fd->pollable_obj == NULL);
+ *p = NULL;
+ }
+ gpr_mu_unlock(&fd->pollable_mu);
return error;
}
@@ -733,23 +746,20 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
pollset->shutdown_closure = closure;
- pollset_kick_all(exec_ctx, pollset);
+ GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(exec_ctx, pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
-static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) {
- return p != &g_empty_pollable && p != &pollset->pollable_obj;
-}
-
-static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, bool drain) {
+static grpc_error *pollable_process_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ pollable *pollable_obj, bool drain) {
static const char *err_desc = "pollset_process_events";
grpc_error *error = GRPC_ERROR_NONE;
for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) &&
- pollset->event_cursor != pollset->event_count;
+ pollable_obj->event_cursor != pollable_obj->event_count;
i++) {
- int n = pollset->event_cursor++;
- struct epoll_event *ev = &pollset->events[n];
+ int n = pollable_obj->event_cursor++;
+ struct epoll_event *ev = &pollable_obj->events[n];
void *data_ptr = ev->data.ptr;
if (1 & (intptr_t)data_ptr) {
if (grpc_polling_trace.enabled()) {
@@ -784,22 +794,17 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- pollable_destroy(&pollset->pollable_obj);
- if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) {
- UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2,
- "pollset_pollable");
- }
- GRPC_LOG_IF_ERROR("pollset_process_events",
- pollset_process_events(exec_ctx, pollset, true));
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = NULL;
}
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- pollable *p, grpc_millis deadline) {
+static grpc_error *pollable_epoll(grpc_exec_ctx *exec_ctx, pollable *p,
+ grpc_millis deadline) {
int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline);
if (grpc_polling_trace.enabled()) {
char *desc = pollable_desc(p);
- gpr_log(GPR_DEBUG, "PS:%p poll %p[%s] for %dms", pollset, p, desc, timeout);
+ gpr_log(GPR_DEBUG, "POLLABLE:%p[%s] poll for %dms", p, desc, timeout);
gpr_free(desc);
}
@@ -809,7 +814,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
int r;
do {
GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
- r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
+ r = epoll_wait(p->epfd, p->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
GRPC_SCHEDULING_END_BLOCKING_REGION_WITH_EXEC_CTX(exec_ctx);
@@ -818,24 +823,24 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p poll %p got %d events", pollset, p, r);
+ gpr_log(GPR_DEBUG, "POLLABLE:%p got %d events", p, r);
}
- pollset->event_cursor = 0;
- pollset->event_count = r;
+ p->event_cursor = 0;
+ p->event_count = r;
return GRPC_ERROR_NONE;
}
/* Return true if first in list */
-static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
- grpc_pollset_worker *worker) {
- if (*root == NULL) {
- *root = worker;
+static bool worker_insert(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker, pwlinks link) {
+ if (*root_worker == NULL) {
+ *root_worker = worker;
worker->links[link].next = worker->links[link].prev = worker;
return true;
} else {
- worker->links[link].next = *root;
+ worker->links[link].next = *root_worker;
worker->links[link].prev = worker->links[link].next->links[link].prev;
worker->links[link].next->links[link].prev = worker;
worker->links[link].prev->links[link].next = worker;
@@ -843,26 +848,26 @@ static bool worker_insert(grpc_pollset_worker **root, pollset_worker_links link,
}
}
-/* Return true if last in list */
-typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
+/* returns the new root IFF the root changed */
+typedef enum { WRR_NEW_ROOT, WRR_EMPTIED, WRR_REMOVED } worker_remove_result;
-static worker_remove_result worker_remove(grpc_pollset_worker **root,
- pollset_worker_links link,
- grpc_pollset_worker *worker) {
- if (worker == *root) {
+static worker_remove_result worker_remove(grpc_pollset_worker **root_worker,
+ grpc_pollset_worker *worker,
+ pwlinks link) {
+ if (worker == *root_worker) {
if (worker == worker->links[link].next) {
- *root = NULL;
- return EMPTIED;
+ *root_worker = NULL;
+ return WRR_EMPTIED;
} else {
- *root = worker->links[link].next;
+ *root_worker = worker->links[link].next;
worker->links[link].prev->links[link].next = worker->links[link].next;
worker->links[link].next->links[link].prev = worker->links[link].prev;
- return NEW_ROOT;
+ return WRR_NEW_ROOT;
}
} else {
worker->links[link].prev->links[link].next = worker->links[link].next;
worker->links[link].next->links[link].prev = worker->links[link].prev;
- return REMOVED;
+ return WRR_REMOVED;
}
}
@@ -871,25 +876,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
- bool do_poll = true;
+ bool do_poll = (pollset->shutdown_closure == nullptr);
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
worker->kicked = false;
worker->pollset = pollset;
- worker->pollable_obj = pollset->current_pollable_obj;
-
- if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
- REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll");
- }
-
- worker_insert(&pollset->root_worker, PWL_POLLSET, worker);
- if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE,
- worker)) {
+ worker->pollable_obj =
+ POLLABLE_REF(pollset->active_pollable, "pollset_worker");
+ worker_insert(&pollset->root_worker, worker, PWLINK_POLLSET);
+ gpr_mu_lock(&worker->pollable_obj->mu);
+ if (!worker_insert(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE)) {
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- }
+ gpr_mu_unlock(&pollset->mu);
if (grpc_polling_trace.enabled() &&
worker->pollable_obj->root_worker != worker) {
gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset,
@@ -897,7 +897,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
poll_deadline_to_millis_timeout(exec_ctx, deadline));
}
while (do_poll && worker->pollable_obj->root_worker != worker) {
- if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu,
+ if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->mu,
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) {
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset,
@@ -916,158 +916,232 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker->pollable_obj, worker);
}
}
- if (worker->pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker->pollable_obj->po.mu);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- gpr_mu_lock(&worker->pollable_obj->po.mu);
- }
grpc_exec_ctx_invalidate_now(exec_ctx);
+ } else {
+ gpr_mu_unlock(&pollset->mu);
}
+ gpr_mu_unlock(&worker->pollable_obj->mu);
- return do_poll && pollset->shutdown_closure == NULL &&
- pollset->current_pollable_obj == worker->pollable_obj;
+ return do_poll;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
- if (NEW_ROOT ==
- worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) {
- gpr_cv_signal(&worker->pollable_obj->root_worker->cv);
+ gpr_mu_lock(&pollset->mu);
+ gpr_mu_lock(&worker->pollable_obj->mu);
+ switch (worker_remove(&worker->pollable_obj->root_worker, worker,
+ PWLINK_POLLABLE)) {
+ case WRR_NEW_ROOT: {
+ // wakeup new poller
+ grpc_pollset_worker *new_root = worker->pollable_obj->root_worker;
+ GPR_ASSERT(new_root->initialized_cv);
+ gpr_cv_signal(&new_root->cv);
+ break;
+ }
+ case WRR_EMPTIED:
+ if (pollset->active_pollable != worker->pollable_obj) {
+ // pollable no longer being polled: flush events
+ pollable_process_events(exec_ctx, pollset, worker->pollable_obj, true);
+ }
+ break;
+ case WRR_REMOVED:
+ break;
+ }
+ gpr_mu_unlock(&worker->pollable_obj->mu);
+ POLLABLE_UNREF(worker->pollable_obj, "pollset_worker");
+ if (worker_remove(&pollset->root_worker, worker, PWLINK_POLLSET) ==
+ WRR_EMPTIED) {
+ pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
- if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) {
- UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll");
- }
- if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) {
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
- }
}
-/* pollset->po.mu lock must be held by the caller before calling this.
+#ifndef NDEBUG
+static long gettid(void) { return syscall(__NR_gettid); }
+#endif
+
+/* pollset->mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
grpc_millis deadline) {
+#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
+ grpc_pollset_worker *worker =
+ (grpc_pollset_worker *)gpr_malloc(sizeof(*worker));
+#define WORKER_PTR (worker)
+#else
grpc_pollset_worker worker;
- if (0 && grpc_polling_trace.enabled()) {
+#define WORKER_PTR (&worker)
+#endif
+#ifndef NDEBUG
+ WORKER_PTR->originator = gettid();
+#endif
+ if (grpc_polling_trace.enabled()) {
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRIdPTR
- " deadline=%" PRIdPTR " kwp=%d root_worker=%p",
- pollset, worker_hdl, &worker, grpc_exec_ctx_now(exec_ctx), deadline,
- pollset->kicked_without_poller, pollset->root_worker);
+ " deadline=%" PRIdPTR " kwp=%d pollable=%p",
+ pollset, worker_hdl, WORKER_PTR, grpc_exec_ctx_now(exec_ctx),
+ deadline, pollset->kicked_without_poller, pollset->active_pollable);
}
- grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
+ grpc_error *error = GRPC_ERROR_NONE;
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
- return GRPC_ERROR_NONE;
- }
- if (pollset->current_pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&pollset->current_pollable_obj->po.mu);
- }
- if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutdown_closure);
- append_error(&error, pollable_materialize(worker.pollable_obj), err_desc);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker.pollable_obj->po.mu);
- }
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- if (pollset->event_cursor == pollset->event_count) {
- append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj,
- deadline),
+ } else {
+ if (begin_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)WORKER_PTR);
+ if (WORKER_PTR->pollable_obj->event_cursor ==
+ WORKER_PTR->pollable_obj->event_count) {
+ append_error(&error, pollable_epoll(exec_ctx, WORKER_PTR->pollable_obj,
+ deadline),
+ err_desc);
+ }
+ append_error(&error,
+ pollable_process_events(exec_ctx, pollset,
+ WORKER_PTR->pollable_obj, false),
err_desc);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_tls_set(&g_current_thread_pollset, 0);
+ gpr_tls_set(&g_current_thread_worker, 0);
}
- append_error(&error, pollset_process_events(exec_ctx, pollset, false),
- err_desc);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_lock(&worker.pollable_obj->po.mu);
- }
- gpr_tls_set(&g_current_thread_pollset, 0);
- gpr_tls_set(&g_current_thread_worker, 0);
- pollset_maybe_finish_shutdown(exec_ctx, pollset);
+ end_worker(exec_ctx, pollset, WORKER_PTR, worker_hdl);
}
- end_worker(exec_ctx, pollset, &worker, worker_hdl);
- if (worker.pollable_obj != &pollset->pollable_obj) {
- gpr_mu_unlock(&worker.pollable_obj->po.mu);
- }
- if (grpc_exec_ctx_has_work(exec_ctx)) {
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
+#ifdef GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP
+ gpr_free(worker);
+#endif
+#undef WORKER_PTR
+ return error;
+}
+
+static grpc_error *pollset_transition_pollable_from_empty_to_fd_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_empty_to_fd";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "PS:%p add fd %p (%d); transition pollable from empty to fd",
+ pollset, fd, fd->fd);
}
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable),
+ err_desc);
return error;
}
-static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_fd *fd = (grpc_fd *)arg;
- UNREF_BY(exec_ctx, fd, 2, "pollset_pollable");
+static grpc_error *pollset_transition_pollable_from_fd_to_multi_locked(
+ grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *and_add_fd) {
+ static const char *err_desc = "pollset_transition_pollable_from_fd_to_multi";
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(
+ GPR_DEBUG,
+ "PS:%p add fd %p (%d); transition pollable from fd %p to multipoller",
+ pollset, and_add_fd, and_add_fd ? and_add_fd->fd : -1,
+ pollset->active_pollable->owner_fd);
+ }
+ append_error(&error, pollset_kick_all(exec_ctx, pollset), err_desc);
+ grpc_fd *initial_fd = pollset->active_pollable->owner_fd;
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = NULL;
+ if (append_error(&error, pollable_create(PO_MULTI, &pollset->active_pollable),
+ err_desc)) {
+ append_error(&error, pollable_add_fd(pollset->active_pollable, initial_fd),
+ err_desc);
+ if (and_add_fd != NULL) {
+ append_error(&error,
+ pollable_add_fd(pollset->active_pollable, and_add_fd),
+ err_desc);
+ }
+ }
+ return error;
}
/* expects pollsets locked, flag whether fd is locked or not */
static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset, grpc_fd *fd,
- bool fd_locked) {
- static const char *err_desc = "pollset_add_fd";
+ grpc_pollset *pollset, grpc_fd *fd) {
grpc_error *error = GRPC_ERROR_NONE;
- if (pollset->current_pollable_obj == &g_empty_pollable) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from empty to fd", pollset,
- fd);
- }
- /* empty pollable --> single fd pollable */
- pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable_obj = &fd->pollable_obj;
- if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu);
- append_error(&error, fd_become_pollable_locked(fd), err_desc);
- if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu);
- REF_BY(fd, 2, "pollset_pollable");
- } else if (pollset->current_pollable_obj == &pollset->pollable_obj) {
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd);
- }
- append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd),
- err_desc);
- } else if (pollset->current_pollable_obj != &fd->pollable_obj) {
- grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj;
- if (grpc_polling_trace.enabled()) {
- gpr_log(GPR_DEBUG,
- "PS:%p add fd %p; transition pollable from fd %p to multipoller",
- pollset, fd, had_fd);
- }
- /* Introduce a spurious completion.
- If we do not, then it may be that the fd-specific epoll set consumed
- a completion without being polled, leading to a missed edge going up. */
- grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
- grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
- pollset_kick_all(exec_ctx, pollset);
- pollset->current_pollable_obj = &pollset->pollable_obj;
- if (append_error(&error, pollable_materialize(&pollset->pollable_obj),
- err_desc)) {
- pollable_add_fd(&pollset->pollable_obj, had_fd);
- pollable_add_fd(&pollset->pollable_obj, fd);
- }
- GRPC_CLOSURE_SCHED(exec_ctx,
- GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ pollable *po_at_start =
+ POLLABLE_REF(pollset->active_pollable, "pollset_add_fd");
+ switch (pollset->active_pollable->type) {
+ case PO_EMPTY:
+ /* empty pollable --> single fd pollable */
+ error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx,
+ pollset, fd);
+ break;
+ case PO_FD:
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
+ 1) == 0) {
+ error = pollset_transition_pollable_from_empty_to_fd_locked(
+ exec_ctx, pollset, fd);
+ } else {
+ /* fd --> multipoller */
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, fd);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
+ break;
+ case PO_MULTI:
+ error = pollable_add_fd(pollset->active_pollable, fd);
+ break;
+ }
+ if (error != GRPC_ERROR_NONE) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = po_at_start;
+ } else {
+ POLLABLE_UNREF(po_at_start, "pollset_add_fd");
+ }
+ return error;
+}
+
+static grpc_error *pollset_as_multipollable_locked(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset,
+ pollable **pollable_obj) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ pollable *po_at_start =
+ POLLABLE_REF(pollset->active_pollable, "pollset_as_multipollable");
+ switch (pollset->active_pollable->type) {
+ case PO_EMPTY:
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ break;
+ case PO_FD:
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) &
+ 1) == 0) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ } else {
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, NULL);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
+ break;
+ case PO_MULTI:
+ break;
+ }
+ if (error != GRPC_ERROR_NONE) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ pollset->active_pollable = po_at_start;
+ *pollable_obj = NULL;
+ } else {
+ *pollable_obj = POLLABLE_REF(pollset->active_pollable, "pollset_set");
+ POLLABLE_UNREF(po_at_start, "pollset_as_multipollable");
}
return error;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_fd *fd) {
- gpr_mu_lock(&pollset->pollable_obj.po.mu);
- grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false);
- gpr_mu_unlock(&pollset->pollable_obj.po.mu);
+ gpr_mu_lock(&pollset->mu);
+ grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd);
+ gpr_mu_unlock(&pollset->mu);
GRPC_LOG_IF_ERROR("pollset_add_fd", error);
}
@@ -1075,301 +1149,255 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* Pollset-set Definitions
*/
+static grpc_pollset_set *pss_lock_adam(grpc_pollset_set *pss) {
+ gpr_mu_lock(&pss->mu);
+ while (pss->parent != NULL) {
+ gpr_mu_unlock(&pss->mu);
+ pss = pss->parent;
+ gpr_mu_lock(&pss->mu);
+ }
+ return pss;
+}
+
static grpc_pollset_set *pollset_set_create(void) {
grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss));
- po_init(&pss->po, PO_POLLSET_SET);
+ gpr_mu_init(&pss->mu);
+ gpr_ref_init(&pss->refs, 1);
return pss;
}
-static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss) {
- po_destroy(&pss->po);
+static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) {
+ if (pss == NULL) return;
+ if (!gpr_unref(&pss->refs)) return;
+ pollset_set_unref(exec_ctx, pss->parent);
+ gpr_mu_destroy(&pss->mu);
+ for (size_t i = 0; i < pss->pollset_count; i++) {
+ gpr_mu_lock(&pss->pollsets[i]->mu);
+ if (0 == --pss->pollsets[i]->containing_pollset_set_count) {
+ pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]);
+ }
+ gpr_mu_unlock(&pss->pollsets[i]->mu);
+ }
+ for (size_t i = 0; i < pss->fd_count; i++) {
+ UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set");
+ }
+ gpr_free(pss->pollsets);
+ gpr_free(pss->fds);
gpr_free(pss);
}
static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
grpc_fd *fd) {
- po_join(exec_ctx, &pss->po, &fd->pollable_obj.po);
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: add fd %p (%d)", pss, fd, fd->fd);
+ }
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_fd";
+ pss = pss_lock_adam(pss);
+ for (size_t i = 0; i < pss->pollset_count; i++) {
+ append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd),
+ err_desc);
+ }
+ if (pss->fd_count == pss->fd_capacity) {
+ pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8);
+ pss->fds =
+ (grpc_fd **)gpr_realloc(pss->fds, pss->fd_capacity * sizeof(*pss->fds));
+ }
+ REF_BY(fd, 2, "pollset_set");
+ pss->fds[pss->fd_count++] = fd;
+ gpr_mu_unlock(&pss->mu);
+
+ GRPC_LOG_IF_ERROR(err_desc, error);
}
static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {}
-
-static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- po_join(exec_ctx, &pss->po, &ps->pollable_obj.po);
+ grpc_fd *fd) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: del fd %p", pss, fd);
+ }
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->fd_count; i++) {
+ if (pss->fds[i] == fd) {
+ UNREF_BY(exec_ctx, fd, 2, "pollset_set");
+ break;
+ }
+ }
+ GPR_ASSERT(i != pss->fd_count);
+ for (; i < pss->fd_count - 1; i++) {
+ pss->fds[i] = pss->fds[i + 1];
+ }
+ pss->fd_count--;
+ gpr_mu_unlock(&pss->mu);
}
static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {}
-
-static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- po_join(exec_ctx, &bag->po, &item->po);
-}
-
-static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {}
-
-static void po_init(polling_obj *po, polling_obj_type type) {
- gpr_mu_init(&po->mu);
- po->type = type;
- po->group = NULL;
- po->next = po;
- po->prev = po;
-}
-
-static polling_group *pg_lock_latest(polling_group *pg) {
- /* assumes pg unlocked; consumes ref, returns ref */
- gpr_mu_lock(&pg->po.mu);
- while (pg->po.group != NULL) {
- polling_group *new_pg = pg_ref(pg->po.group);
- gpr_mu_unlock(&pg->po.mu);
- pg_unref(pg);
- pg = new_pg;
- gpr_mu_lock(&pg->po.mu);
- }
- return pg;
-}
-
-static void po_destroy(polling_obj *po) {
- if (po->group != NULL) {
- polling_group *pg = pg_lock_latest(po->group);
- po->prev->next = po->next;
- po->next->prev = po->prev;
- gpr_mu_unlock(&pg->po.mu);
- pg_unref(pg);
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: del pollset %p", pss, ps);
}
- gpr_mu_destroy(&po->mu);
-}
-
-static polling_group *pg_ref(polling_group *pg) {
- gpr_ref(&pg->refs);
- return pg;
-}
-
-static void pg_unref(polling_group *pg) {
- if (gpr_unref(&pg->refs)) {
- po_destroy(&pg->po);
- gpr_free(pg);
+ pss = pss_lock_adam(pss);
+ size_t i;
+ for (i = 0; i < pss->pollset_count; i++) {
+ if (pss->pollsets[i] == ps) {
+ break;
+ }
}
-}
-
-static int po_cmp(polling_obj *a, polling_obj *b) {
- if (a == b) return 0;
- if (a->type < b->type) return -1;
- if (a->type > b->type) return 1;
- if (a < b) return -1;
- assert(a > b);
- return 1;
-}
-
-static void po_join(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
- switch (po_cmp(a, b)) {
- case 0:
- return;
- case 1:
- GPR_SWAP(polling_obj *, a, b);
- /* fall through */
- case -1:
- gpr_mu_lock(&a->mu);
- gpr_mu_lock(&b->mu);
-
- if (a->group == NULL) {
- if (b->group == NULL) {
- polling_obj *initial_po[] = {a, b};
- pg_create(exec_ctx, initial_po, GPR_ARRAY_SIZE(initial_po));
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- } else {
- polling_group *b_group = pg_ref(b->group);
- gpr_mu_unlock(&b->mu);
- gpr_mu_unlock(&a->mu);
- pg_join(exec_ctx, b_group, a);
- }
- } else if (b->group == NULL) {
- polling_group *a_group = pg_ref(a->group);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- pg_join(exec_ctx, a_group, b);
- } else if (a->group == b->group) {
- /* nothing to do */
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- } else {
- polling_group *a_group = pg_ref(a->group);
- polling_group *b_group = pg_ref(b->group);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
- pg_merge(exec_ctx, a_group, b_group);
- }
+ GPR_ASSERT(i != pss->pollset_count);
+ for (; i < pss->pollset_count - 1; i++) {
+ pss->pollsets[i] = pss->pollsets[i + 1];
}
-}
-
-static void pg_notify(grpc_exec_ctx *exec_ctx, polling_obj *a, polling_obj *b) {
- if (a->type == PO_FD && b->type == PO_POLLSET) {
- pollset_add_fd_locked(exec_ctx, (grpc_pollset *)b, (grpc_fd *)a, true);
- } else if (a->type == PO_POLLSET && b->type == PO_FD) {
- pollset_add_fd_locked(exec_ctx, (grpc_pollset *)a, (grpc_fd *)b, true);
+ pss->pollset_count--;
+ gpr_mu_unlock(&pss->mu);
+ gpr_mu_lock(&ps->mu);
+ if (0 == --ps->containing_pollset_set_count) {
+ pollset_maybe_finish_shutdown(exec_ctx, ps);
}
+ gpr_mu_unlock(&ps->mu);
}
-static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from,
- polling_group *to) {
- for (polling_obj *a = from->po.next; a != &from->po; a = a->next) {
- for (polling_obj *b = to->po.next; b != &to->po; b = b->next) {
- if (po_cmp(a, b) < 0) {
- gpr_mu_lock(&a->mu);
- gpr_mu_lock(&b->mu);
- } else {
- GPR_ASSERT(po_cmp(a, b) != 0);
- gpr_mu_lock(&b->mu);
- gpr_mu_lock(&a->mu);
+// add all fds to pollables, and output a new array of unorphaned out_fds
+// assumes pollsets are multipollable
+static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
+ size_t fd_count, grpc_pollset **pollsets,
+ size_t pollset_count,
+ const char *err_desc, grpc_fd **out_fds,
+ size_t *out_fd_count) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ for (size_t i = 0; i < fd_count; i++) {
+ gpr_mu_lock(&fds[i]->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) {
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
+ } else {
+ for (size_t j = 0; j < pollset_count; j++) {
+ append_error(&error,
+ pollable_add_fd(pollsets[j]->active_pollable, fds[i]),
+ err_desc);
}
- pg_notify(exec_ctx, a, b);
- gpr_mu_unlock(&a->mu);
- gpr_mu_unlock(&b->mu);
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ out_fds[(*out_fd_count)++] = fds[i];
}
}
+ return error;
}
-static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po,
- size_t initial_po_count) {
- /* assumes all polling objects in initial_po are locked */
- polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg));
- po_init(&pg->po, PO_POLLING_GROUP);
- gpr_ref_init(&pg->refs, (int)initial_po_count);
- for (size_t i = 0; i < initial_po_count; i++) {
- GPR_ASSERT(initial_po[i]->group == NULL);
- initial_po[i]->group = pg;
- }
- for (size_t i = 1; i < initial_po_count; i++) {
- initial_po[i]->prev = initial_po[i - 1];
- }
- for (size_t i = 0; i < initial_po_count - 1; i++) {
- initial_po[i]->next = initial_po[i + 1];
- }
- initial_po[0]->prev = &pg->po;
- initial_po[initial_po_count - 1]->next = &pg->po;
- pg->po.next = initial_po[0];
- pg->po.prev = initial_po[initial_po_count - 1];
- for (size_t i = 1; i < initial_po_count; i++) {
- for (size_t j = 0; j < i; j++) {
- pg_notify(exec_ctx, initial_po[i], initial_po[j]);
- }
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pss, grpc_pollset *ps) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS:%p: add pollset %p", pss, ps);
}
-}
-
-static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg,
- polling_obj *po) {
- /* assumes neither pg nor po are locked; consumes one ref to pg */
- pg = pg_lock_latest(pg);
- /* pg locked */
- for (polling_obj *existing = pg->po.next /* skip pg - it's just a stub */;
- existing != &pg->po; existing = existing->next) {
- if (po_cmp(po, existing) < 0) {
- gpr_mu_lock(&po->mu);
- gpr_mu_lock(&existing->mu);
- } else {
- GPR_ASSERT(po_cmp(po, existing) != 0);
- gpr_mu_lock(&existing->mu);
- gpr_mu_lock(&po->mu);
- }
- /* pg, po, existing locked */
- if (po->group != NULL) {
- gpr_mu_unlock(&pg->po.mu);
- polling_group *po_group = pg_ref(po->group);
- gpr_mu_unlock(&po->mu);
- gpr_mu_unlock(&existing->mu);
- pg_merge(exec_ctx, pg, po_group);
- /* early exit: polling obj picked up a group during joining: we needed
- to do a full merge */
- return;
- }
- pg_notify(exec_ctx, po, existing);
- gpr_mu_unlock(&po->mu);
- gpr_mu_unlock(&existing->mu);
- }
- gpr_mu_lock(&po->mu);
- if (po->group != NULL) {
- gpr_mu_unlock(&pg->po.mu);
- polling_group *po_group = pg_ref(po->group);
- gpr_mu_unlock(&po->mu);
- pg_merge(exec_ctx, pg, po_group);
- /* early exit: polling obj picked up a group during joining: we needed
- to do a full merge */
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_pollset";
+ pollable *pollable_obj = NULL;
+ gpr_mu_lock(&ps->mu);
+ if (!GRPC_LOG_IF_ERROR(err_desc, pollset_as_multipollable_locked(
+ exec_ctx, ps, &pollable_obj))) {
+ GPR_ASSERT(pollable_obj == NULL);
+ gpr_mu_unlock(&ps->mu);
return;
}
- po->group = pg;
- po->next = &pg->po;
- po->prev = pg->po.prev;
- po->prev->next = po->next->prev = po;
- gpr_mu_unlock(&pg->po.mu);
- gpr_mu_unlock(&po->mu);
+ ps->containing_pollset_set_count++;
+ gpr_mu_unlock(&ps->mu);
+ pss = pss_lock_adam(pss);
+ size_t initial_fd_count = pss->fd_count;
+ pss->fd_count = 0;
+ append_error(&error,
+ add_fds_to_pollsets(exec_ctx, pss->fds, initial_fd_count, &ps, 1,
+ err_desc, pss->fds, &pss->fd_count),
+ err_desc);
+ if (pss->pollset_count == pss->pollset_capacity) {
+ pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8);
+ pss->pollsets = (grpc_pollset **)gpr_realloc(
+ pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets));
+ }
+ pss->pollsets[pss->pollset_count++] = ps;
+ gpr_mu_unlock(&pss->mu);
+ POLLABLE_UNREF(pollable_obj, "pollset_set");
+
+ GRPC_LOG_IF_ERROR(err_desc, error);
}
-static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a,
- polling_group *b) {
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *a,
+ grpc_pollset_set *b) {
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS: merge (%p, %p)", a, b);
+ }
+ grpc_error *error = GRPC_ERROR_NONE;
+ static const char *err_desc = "pollset_set_add_fd";
for (;;) {
if (a == b) {
- pg_unref(a);
- pg_unref(b);
+ // pollset ancestors are the same: nothing to do
return;
}
- if (a > b) GPR_SWAP(polling_group *, a, b);
- gpr_mu_lock(&a->po.mu);
- gpr_mu_lock(&b->po.mu);
- if (a->po.group != NULL) {
- polling_group *m2 = pg_ref(a->po.group);
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- pg_unref(a);
- a = m2;
- } else if (b->po.group != NULL) {
- polling_group *m2 = pg_ref(b->po.group);
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- pg_unref(b);
- b = m2;
+ if (a > b) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ gpr_mu *a_mu = &a->mu;
+ gpr_mu *b_mu = &b->mu;
+ gpr_mu_lock(a_mu);
+ gpr_mu_lock(b_mu);
+ if (a->parent != NULL) {
+ a = a->parent;
+ } else if (b->parent != NULL) {
+ b = b->parent;
} else {
- break;
+ break; // exit loop, both pollsets locked
}
+ gpr_mu_unlock(a_mu);
+ gpr_mu_unlock(b_mu);
}
- polling_group **unref = NULL;
- size_t unref_count = 0;
- size_t unref_cap = 0;
- b->po.group = a;
- pg_broadcast(exec_ctx, a, b);
- pg_broadcast(exec_ctx, b, a);
- while (b->po.next != &b->po) {
- polling_obj *po = b->po.next;
- gpr_mu_lock(&po->mu);
- if (unref_count == unref_cap) {
- unref_cap = GPR_MAX(8, 3 * unref_cap / 2);
- unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref));
- }
- unref[unref_count++] = po->group;
- po->group = pg_ref(a);
- // unlink from b
- po->prev->next = po->next;
- po->next->prev = po->prev;
- // link to a
- po->next = &a->po;
- po->prev = a->po.prev;
- po->next->prev = po->prev->next = po;
- gpr_mu_unlock(&po->mu);
- }
- gpr_mu_unlock(&a->po.mu);
- gpr_mu_unlock(&b->po.mu);
- for (size_t i = 0; i < unref_count; i++) {
- pg_unref(unref[i]);
- }
- gpr_free(unref);
- pg_unref(b);
+ // try to do the least copying possible
+ // TODO(ctiller): there's probably a better heuristic here
+ const size_t a_size = a->fd_count + a->pollset_count;
+ const size_t b_size = b->fd_count + b->pollset_count;
+ if (b_size > a_size) {
+ GPR_SWAP(grpc_pollset_set *, a, b);
+ }
+ if (grpc_polling_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "PSS: parent %p to %p", b, a);
+ }
+ gpr_ref(&a->refs);
+ b->parent = a;
+ if (a->fd_capacity < a->fd_count + b->fd_count) {
+ a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
+ a->fds = (grpc_fd **)gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
+ }
+ size_t initial_a_fd_count = a->fd_count;
+ a->fd_count = 0;
+ append_error(&error, add_fds_to_pollsets(exec_ctx, a->fds, initial_a_fd_count,
+ b->pollsets, b->pollset_count,
+ "merge_a2b", a->fds, &a->fd_count),
+ err_desc);
+ append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count,
+ a->pollsets, a->pollset_count,
+ "merge_b2a", a->fds, &a->fd_count),
+ err_desc);
+ if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
+ a->pollset_capacity =
+ GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
+ a->pollsets = (grpc_pollset **)gpr_realloc(
+ a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
+ }
+ if (b->pollset_count > 0) {
+ memcpy(a->pollsets + a->pollset_count, b->pollsets,
+ b->pollset_count * sizeof(*b->pollsets));
+ }
+ a->pollset_count += b->pollset_count;
+ gpr_free(b->fds);
+ gpr_free(b->pollsets);
+ b->fds = NULL;
+ b->pollsets = NULL;
+ b->fd_count = b->fd_capacity = b->pollset_count = b->pollset_capacity = 0;
+ gpr_mu_unlock(&a->mu);
+ gpr_mu_unlock(&b->mu);
}
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {}
+
/*******************************************************************************
* Event engine binding
*/
@@ -1399,7 +1427,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_add_fd,
pollset_set_create,
- pollset_set_destroy,
+ pollset_set_unref, // destroy ==> unref 1 public ref
pollset_set_add_pollset,
pollset_set_del_pollset,
pollset_set_add_pollset_set,
@@ -1412,6 +1440,10 @@ static const grpc_event_engine_vtable vtable = {
const grpc_event_engine_vtable *grpc_init_epollex_linux(
bool explicitly_requested) {
+ if (!explicitly_requested) {
+ return NULL;
+ }
+
if (!grpc_has_wakeup_fd()) {
return NULL;
}
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 45403c3037..28e4efa011 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -91,12 +91,9 @@ const grpc_event_engine_vtable *init_non_polling(bool explicit_request) {
} // namespace
static const event_engine_factory g_factories[] = {
- {"epoll1", grpc_init_epoll1_linux},
- {"epollsig", grpc_init_epollsig_linux},
- {"poll", grpc_init_poll_posix},
- {"poll-cv", grpc_init_poll_cv_posix},
- {"epollex", grpc_init_epollex_linux},
- {"none", init_non_polling},
+ {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
+ {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix},
+ {"poll-cv", grpc_init_poll_cv_posix}, {"none", init_non_polling},
};
static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
diff --git a/src/core/lib/iomgr/exec_ctx.cc b/src/core/lib/iomgr/exec_ctx.cc
index 6707f40f10..e4d956dbef 100644
--- a/src/core/lib/iomgr/exec_ctx.cc
+++ b/src/core/lib/iomgr/exec_ctx.cc
@@ -152,6 +152,15 @@ void grpc_exec_ctx_invalidate_now(grpc_exec_ctx *exec_ctx) {
gpr_timespec grpc_millis_to_timespec(grpc_millis millis,
gpr_clock_type clock_type) {
+ // special-case infinities as grpc_millis can be 32bit on some platforms
+ // while gpr_time_from_millis always takes an int64_t.
+ if (millis == GRPC_MILLIS_INF_FUTURE) {
+ return gpr_inf_future(clock_type);
+ }
+ if (millis == GRPC_MILLIS_INF_PAST) {
+ return gpr_inf_past(clock_type);
+ }
+
if (clock_type == GPR_TIMESPAN) {
return gpr_time_from_millis(millis, GPR_TIMESPAN);
}
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index ae8d7b4724..cfdacaf988 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -186,7 +186,7 @@ static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
if (old_count == 0) {
GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx);
- p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size());
+ p = (backup_poller *)gpr_zalloc(sizeof(*p) + grpc_pollset_size());
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
}