aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/combiner.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c310
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c5
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c6
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c7
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c405
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/exec_ctx.c50
-rw-r--r--src/core/lib/iomgr/gethostname.h26
-rw-r--r--src/core/lib/iomgr/gethostname_fallback.c27
-rw-r--r--src/core/lib/iomgr/gethostname_host_name_max.c37
-rw-r--r--src/core/lib/iomgr/gethostname_sysconf.c37
-rw-r--r--src/core/lib/iomgr/nameser.h104
-rw-r--r--src/core/lib/iomgr/port.h14
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.c2
-rw-r--r--src/core/lib/iomgr/udp_server.c2
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h6
22 files changed, 831 insertions, 238 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index 518024fcfd..0e496829f6 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -73,10 +73,8 @@ static const grpc_closure_scheduler_vtable finally_scheduler = {
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
grpc_combiner *grpc_combiner_create(void) {
- grpc_combiner *lock = gpr_malloc(sizeof(*lock));
+ grpc_combiner *lock = gpr_zalloc(sizeof(*lock));
gpr_ref_init(&lock->refs, 1);
- lock->next_combiner_on_this_exec_ctx = NULL;
- lock->time_to_execute_final_list = false;
lock->scheduler.vtable = &scheduler;
lock->finally_scheduler.vtable = &finally_scheduler;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index b89b8af15a..b940d48ba9 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -48,7 +48,60 @@
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
-static int g_epfd;
+
+/*******************************************************************************
+ * Singleton epoll set related fields
+ */
+
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
+
+/* NOTE ON SYNCHRONIZATION:
+ * - Fields in this struct are only modified by the designated poller. Hence
+ * there is no need for any locks to protect the struct.
+ * - num_events and cursor fields have to be of atomic type to provide memory
+ * visibility guarantees only. i.e In case of multiple pollers, the designated
+ * polling thread keeps changing; the thread that wrote these values may be
+ * different from the thread reading the values
+ */
+typedef struct epoll_set {
+ int epfd;
+
+ /* The epoll_events after the last call to epoll_wait() */
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ /* The number of epoll_events after the last call to epoll_wait() */
+ gpr_atm num_events;
+
+ /* Index of the first event in epoll_events that has to be processed. This
+ * field is only valid if num_events > 0 */
+ gpr_atm cursor;
+} epoll_set;
+
+/* The global singleton epoll set */
+static epoll_set g_epoll_set;
+
+/* Must be called *only* once */
+static bool epoll_set_init() {
+ g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (g_epoll_set.epfd < 0) {
+ gpr_log(GPR_ERROR, "epoll unavailable");
+ return false;
+ }
+
+ gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
+ gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
+ gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
+ return true;
+}
+
+/* epoll_set_init() MUST be called before calling this. */
+static void epoll_set_shutdown() {
+ if (g_epoll_set.epfd >= 0) {
+ close(g_epoll_set.epfd);
+ g_epoll_set.epfd = -1;
+ }
+}
/*******************************************************************************
* Fd Declarations
@@ -122,7 +175,7 @@ struct grpc_pollset {
bool kicked_without_poller;
/* Set to true if the pollset is observed to have no workers available to
- * poll */
+ poll */
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@@ -228,7 +281,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
.data.ptr = new_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -237,30 +290,43 @@ static grpc_fd *fd_create(int fd, const char *name) {
static int fd_wrapped_fd(grpc_fd *fd) { return fd->fd; }
-/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+/* if 'releasing_fd' is true, it means that we are going to detach the internal
+ * fd from grpc_fd structure (i.e which means we should not be calling
+ * shutdown() syscall on that fd) */
+static void fd_shutdown_internal(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_error *why, bool releasing_fd) {
if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
+ if (!releasing_fd) {
+ shutdown(fd->fd, SHUT_RDWR);
+ }
grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
}
GRPC_ERROR_UNREF(why);
}
+/* Might be called multiple times */
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+ fd_shutdown_internal(exec_ctx, fd, why, false);
+}
+
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
+ bool is_release_fd = (release_fd != NULL);
if (!grpc_lfev_is_shutdown(&fd->read_closure)) {
- fd_shutdown(exec_ctx, fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason));
+ fd_shutdown_internal(exec_ctx, fd,
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
+ is_release_fd);
}
/* If release_fd is not NULL, we should be relinquishing control of the file
descriptor fd->fd (but we still own the grpc_fd structure). */
- if (release_fd != NULL) {
+ if (is_release_fd) {
*release_fd = fd->fd;
- } else {
+ } else if (!already_closed) {
close(fd->fd);
}
@@ -313,7 +379,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
+
+/* The designated poller */
static gpr_atm g_active_poller;
+
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
@@ -367,7 +436,8 @@ static grpc_error *pollset_global_init(void) {
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
+ &ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@@ -393,7 +463,14 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
gpr_mu_init(&pollset->mu);
*mu = &pollset->mu;
pollset->neighbourhood = &g_neighbourhoods[choose_neighbourhood()];
+ pollset->reassigning_neighbourhood = false;
+ pollset->root_worker = NULL;
+ pollset->kicked_without_poller = false;
pollset->seen_inactive = true;
+ pollset->shutting_down = false;
+ pollset->shutdown_closure = NULL;
+ pollset->begin_refs = 0;
+ pollset->next = pollset->prev = NULL;
}
static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
@@ -425,6 +502,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
+ GPR_TIMER_BEGIN("pollset_kick_all", 0);
grpc_error *error = GRPC_ERROR_NONE;
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
@@ -450,7 +528,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
}
// TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
// in the else case
-
+ GPR_TIMER_END("pollset_kick_all", 0);
return error;
}
@@ -458,6 +536,7 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
pollset->begin_refs == 0) {
+ GPR_TIMER_MARK("pollset_finish_shutdown", 0);
GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
@@ -465,16 +544,16 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
+ GPR_TIMER_BEGIN("pollset_shutdown", 0);
GPR_ASSERT(pollset->shutdown_closure == NULL);
GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
pollset->shutting_down = true;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
+ GPR_TIMER_END("pollset_shutdown", 0);
}
-#define MAX_EPOLL_EVENTS 100
-
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
@@ -493,52 +572,93 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
return millis >= 1 ? millis : 1;
}
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- gpr_timespec now, gpr_timespec deadline) {
- struct epoll_event events[MAX_EPOLL_EVENTS];
- static const char *err_desc = "pollset_poll";
-
- int timeout = poll_deadline_to_millis_timeout(deadline, now);
+/* Process the epoll events found by do_epoll_wait() function.
+ - g_epoll_set.cursor points to the index of the first event to be processed
+ - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
+ updates the g_epoll_set.cursor
+
+ NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
+ called by g_active_poller thread. So there is no need for synchronization
+ when accessing fields in g_epoll_set */
+static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ static const char *err_desc = "process_events";
+ grpc_error *error = GRPC_ERROR_NONE;
- if (timeout != 0) {
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- }
- int r;
- do {
- r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
- } while (r < 0 && errno == EINTR);
- if (timeout != 0) {
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- }
+ GPR_TIMER_BEGIN("process_epoll_events", 0);
+ long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
+ long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
+ for (int idx = 0;
+ (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
+ idx++) {
+ long c = cursor++;
+ struct epoll_event *ev = &g_epoll_set.events[c];
+ void *data_ptr = ev->data.ptr;
- if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
-
- grpc_error *error = GRPC_ERROR_NONE;
- for (int i = 0; i < r; i++) {
- void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)(data_ptr);
- bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
- bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
- bool write_ev = (events[i].events & EPOLLOUT) != 0;
+ bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ev->events & EPOLLOUT) != 0;
+
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
+
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
}
}
-
+ gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
+ GPR_TIMER_END("process_epoll_events", 0);
return error;
}
+/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
+ "process" any of the events yet; that is done in process_epoll_events().
+ *See process_epoll_events() function for more details.
+
+ NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
+ (i.e the designated poller thread) will be calling this function. So there is
+ no need for any synchronization when accesing fields in g_epoll_set */
+static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
+ gpr_timespec now, gpr_timespec deadline) {
+ GPR_TIMER_BEGIN("do_epoll_wait", 0);
+
+ int r;
+ int timeout = poll_deadline_to_millis_timeout(deadline, now);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ }
+ do {
+ r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
+ timeout);
+ } while (r < 0 && errno == EINTR);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ }
+
+ if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
+ }
+
+ gpr_atm_rel_store(&g_epoll_set.num_events, r);
+ gpr_atm_rel_store(&g_epoll_set.cursor, 0);
+
+ GPR_TIMER_END("do_epoll_wait", 0);
+ return GRPC_ERROR_NONE;
+}
+
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, gpr_timespec *now,
gpr_timespec deadline) {
+ GPR_TIMER_BEGIN("begin_worker", 0);
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
SET_KICK_STATE(worker, UNKICKED);
@@ -643,14 +763,17 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
if (pollset->kicked_without_poller) {
pollset->kicked_without_poller = false;
+ GPR_TIMER_END("begin_worker", 0);
return false;
}
+ GPR_TIMER_END("begin_worker", 0);
return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighbourhood_for_available_poller(
pollset_neighbourhood *neighbourhood) {
+ GPR_TIMER_BEGIN("check_neighbourhood_for_available_poller", 0);
bool found_worker = false;
do {
grpc_pollset *inspect = neighbourhood->active_root;
@@ -672,6 +795,7 @@ static bool check_neighbourhood_for_available_poller(
}
SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
+ GPR_TIMER_MARK("signal worker", 0);
gpr_cv_signal(&inspect_worker->cv);
}
} else {
@@ -707,12 +831,14 @@ static bool check_neighbourhood_for_available_poller(
}
gpr_mu_unlock(&inspect->mu);
} while (!found_worker);
+ GPR_TIMER_END("check_neighbourhood_for_available_poller", 0);
return found_worker;
}
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
+ GPR_TIMER_BEGIN("end_worker", 0);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
}
@@ -782,42 +908,71 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
+ GPR_TIMER_END("end_worker", 0);
}
/* pollset->po.mu lock must be held by the caller before calling this.
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
- if (pollset->kicked_without_poller) {
- pollset->kicked_without_poller = false;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ if (ps->kicked_without_poller) {
+ ps->kicked_without_poller = false;
+ GPR_TIMER_END("pollset_work", 0);
return GRPC_ERROR_NONE;
}
- if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+
+ if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutting_down);
- GPR_ASSERT(!pollset->seen_inactive);
- gpr_mu_unlock(&pollset->mu);
- append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
- err_desc);
- gpr_mu_lock(&pollset->mu);
+ GPR_ASSERT(!ps->shutting_down);
+ GPR_ASSERT(!ps->seen_inactive);
+
+ gpr_mu_unlock(&ps->mu); /* unlock */
+ /* This is the designated polling thread at this point and should ideally do
+ polling. However, if there are unprocessed events left from a previous
+ call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
+ process the pending epoll events.
+
+ The reason for decoupling do_epoll_wait and process_epoll_events is to
+ better distrubute the work (i.e handling epoll events) across multiple
+ threads
+
+ process_epoll_events() returns very quickly: It just queues the work on
+ exec_ctx but does not execute it (the actual exectution or more
+ accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
+ a designated poller). So we are not waiting long periods without a
+ designated poller */
+ if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
+ gpr_atm_acq_load(&g_epoll_set.num_events)) {
+ append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
+ err_desc);
+ }
+ append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
+
+ gpr_mu_lock(&ps->mu); /* lock */
+
gpr_tls_set(&g_current_thread_worker, 0);
} else {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
}
- end_worker(exec_ctx, pollset, &worker, worker_hdl);
+ end_worker(exec_ctx, ps, &worker, worker_hdl);
+
gpr_tls_set(&g_current_thread_pollset, 0);
+ GPR_TIMER_END("pollset_work", 0);
return error;
}
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ GPR_TIMER_BEGIN("pollset_kick", 0);
+ grpc_error *ret_err = GRPC_ERROR_NONE;
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_strvec log;
gpr_strvec_init(&log);
@@ -852,7 +1007,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked_without_poller");
}
- return GRPC_ERROR_NONE;
+ goto done;
}
grpc_pollset_worker *next_worker = root_worker->next;
if (root_worker->kick_state == KICKED) {
@@ -860,13 +1015,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
- return GRPC_ERROR_NONE;
+ goto done;
} else if (next_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
}
SET_KICK_STATE(next_worker, KICKED);
- return GRPC_ERROR_NONE;
+ goto done;
} else if (root_worker ==
next_worker && // only try and wake up a poller if
// there is no next worker
@@ -876,7 +1031,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
}
SET_KICK_STATE(root_worker, KICKED);
- return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ goto done;
} else if (next_worker->kick_state == UNKICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
@@ -884,7 +1040,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
GPR_ASSERT(next_worker->initialized_cv);
SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
- return GRPC_ERROR_NONE;
+ goto done;
} else if (next_worker->kick_state == DESIGNATED_POLLER) {
if (root_worker->kick_state != DESIGNATED_POLLER) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
@@ -897,59 +1053,64 @@ static grpc_error *pollset_kick(grpc_pollset *pollset,
if (root_worker->initialized_cv) {
gpr_cv_signal(&root_worker->cv);
}
- return GRPC_ERROR_NONE;
+ goto done;
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
root_worker);
}
SET_KICK_STATE(next_worker, KICKED);
- return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ goto done;
}
} else {
GPR_ASSERT(next_worker->kick_state == KICKED);
SET_KICK_STATE(next_worker, KICKED);
- return GRPC_ERROR_NONE;
+ goto done;
}
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kicked while waking up");
}
- return GRPC_ERROR_NONE;
+ goto done;
}
} else if (specific_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
- return GRPC_ERROR_NONE;
+ goto done;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
}
SET_KICK_STATE(specific_worker, KICKED);
- return GRPC_ERROR_NONE;
+ goto done;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick active poller");
}
SET_KICK_STATE(specific_worker, KICKED);
- return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ goto done;
} else if (specific_worker->initialized_cv) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
- return GRPC_ERROR_NONE;
+ goto done;
} else {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. kick non-waiting worker");
}
SET_KICK_STATE(specific_worker, KICKED);
- return GRPC_ERROR_NONE;
+ goto done;
}
+done:
+ GPR_TIMER_END("pollset_kick", 0);
+ return ret_err;
}
static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -993,7 +1154,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
- close(g_epfd);
+ epoll_set_shutdown();
}
static const grpc_event_engine_vtable vtable = {
@@ -1028,28 +1189,29 @@ static const grpc_event_engine_vtable vtable = {
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
+ * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
+ * support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
+ if (!explicit_request) {
+ return NULL;
+ }
+
if (!grpc_has_wakeup_fd()) {
return NULL;
}
- g_epfd = epoll_create1(EPOLL_CLOEXEC);
- if (g_epfd < 0) {
- gpr_log(GPR_ERROR, "epoll unavailable");
+ if (!epoll_set_init()) {
return NULL;
}
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
- close(g_epfd);
fd_global_shutdown();
+ epoll_set_shutdown();
return NULL;
}
- gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
-
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 5166dc2ac2..f2f3e15704 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -931,7 +931,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -952,8 +952,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 1058f69a83..07c8eadf4f 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -493,8 +493,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
epoll_set *unref_eps = NULL;
@@ -505,7 +505,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index a2fa4ad9a5..1f4adea5d4 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -49,7 +49,7 @@
#include "src/core/lib/support/spinlock.h"
/*******************************************************************************
- * Pollset-set sibling link
+ * Polling object
*/
typedef enum {
@@ -380,8 +380,8 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
- bool is_fd_closed = false;
+ bool already_closed, const char *reason) {
+ bool is_fd_closed = already_closed;
grpc_error *error = GRPC_ERROR_NONE;
gpr_mu_lock(&fd->pollable.po.mu);
@@ -392,7 +392,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
descriptor fd->fd (but we still own the grpc_fd structure). */
if (release_fd != NULL) {
*release_fd = fd->fd;
- } else {
+ } else if (!is_fd_closed) {
close(fd->fd);
is_fd_closed = true;
}
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index a84c208e1a..070d75e42a 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -854,7 +854,7 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
grpc_error *error = GRPC_ERROR_NONE;
polling_island *unref_pi = NULL;
@@ -875,8 +875,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
before doing this.) */
if (fd->po.pi != NULL) {
polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, false /* is_fd_closed */,
- &error);
+ polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
gpr_mu_unlock(&pi_latest->mu);
unref_pi = fd->po.pi;
@@ -1731,7 +1730,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
if (!is_grpc_wakeup_signal_initialized) {
/* TODO(ctiller): when other epoll engines are ready, remove the true || to
* force this to be explitly chosen if needed */
- if (explicit_request) {
+ if (true || explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 593d53e4eb..266c280c48 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -42,6 +42,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/murmur_hash.h"
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
@@ -239,22 +240,43 @@ struct grpc_pollset_set {
* condition variable polling definitions
*/
+#define POLLCV_THREAD_GRACE_MS 1000
#define CV_POLL_PERIOD_MS 1000
#define CV_DEFAULT_TABLE_SIZE 16
-typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t;
-
-typedef struct poll_args {
+typedef struct poll_result {
gpr_refcount refcount;
- gpr_cv *cv;
+ cv_node *watchers;
+ int watchcount;
struct pollfd *fds;
nfds_t nfds;
- int timeout;
int retval;
int err;
- gpr_atm status;
+ int completed;
+} poll_result;
+
+typedef struct poll_args {
+ gpr_cv trigger;
+ int trigger_set;
+ struct pollfd *fds;
+ nfds_t nfds;
+ poll_result *result;
+ struct poll_args *next;
+ struct poll_args *prev;
} poll_args;
+// This is a 2-tiered cache, we mantain a hash table
+// of active poll calls, so we can wait on the result
+// of that call. We also maintain a freelist of inactive
+// poll threads.
+typedef struct poll_hash_table {
+ poll_args *free_pollers;
+ poll_args **active_pollers;
+ unsigned int size;
+ unsigned int count;
+} poll_hash_table;
+
+poll_hash_table poll_cache;
cv_fd_table g_cvfds;
/*******************************************************************************
@@ -398,11 +420,14 @@ static int fd_wrapped_fd(grpc_fd *fd) {
static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *on_done, int *release_fd,
- const char *reason) {
+ bool already_closed, const char *reason) {
fd->on_done_closure = on_done;
fd->released = release_fd != NULL;
- if (fd->released) {
+ if (release_fd != NULL) {
*release_fd = fd->fd;
+ fd->released = true;
+ } else if (already_closed) {
+ fd->released = true;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
@@ -1284,43 +1309,205 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
* Condition Variable polling extensions
*/
-static void decref_poll_args(poll_args *args) {
- if (gpr_unref(&args->refcount)) {
- gpr_free(args->fds);
- gpr_cv_destroy(args->cv);
- gpr_free(args->cv);
- gpr_free(args);
+static void run_poll(void *args);
+static void cache_poller_locked(poll_args *args);
+
+static void cache_insert_locked(poll_args *args) {
+ uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
+ 0xDEADBEEF);
+ key = key % poll_cache.size;
+ if (poll_cache.active_pollers[key]) {
+ poll_cache.active_pollers[key]->prev = args;
}
+ args->next = poll_cache.active_pollers[key];
+ args->prev = NULL;
+ poll_cache.active_pollers[key] = args;
+ poll_cache.count++;
}
-// Poll in a background thread
-static void run_poll(void *arg) {
- int timeout, retval;
- poll_args *pargs = (poll_args *)arg;
- while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- if (pargs->timeout < 0) {
- timeout = CV_POLL_PERIOD_MS;
- } else {
- timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout);
- pargs->timeout -= timeout;
+static void init_result(poll_args *pargs) {
+ pargs->result = gpr_malloc(sizeof(poll_result));
+ gpr_ref_init(&pargs->result->refcount, 1);
+ pargs->result->watchers = NULL;
+ pargs->result->watchcount = 0;
+ pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds);
+ memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
+ pargs->result->nfds = pargs->nfds;
+ pargs->result->retval = 0;
+ pargs->result->err = 0;
+ pargs->result->completed = 0;
+}
+
+// Creates a poll_args object for a given arguments to poll().
+// This object may return a poll_args in the cache.
+static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) {
+ uint32_t key =
+ gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ poll_args *curr = poll_cache.active_pollers[key];
+ while (curr) {
+ if (curr->nfds == count &&
+ memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
+ gpr_free(fds);
+ return curr;
}
- retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout);
- if (retval != 0 || pargs->timeout == 0) {
- pargs->retval = retval;
- pargs->err = errno;
- break;
+ curr = curr->next;
+ }
+
+ if (poll_cache.free_pollers) {
+ poll_args *pargs = poll_cache.free_pollers;
+ poll_cache.free_pollers = pargs->next;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = NULL;
}
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ return pargs;
+ }
+
+ poll_args *pargs = gpr_malloc(sizeof(struct poll_args));
+ gpr_cv_init(&pargs->trigger);
+ pargs->fds = fds;
+ pargs->nfds = count;
+ pargs->next = NULL;
+ pargs->prev = NULL;
+ pargs->trigger_set = 0;
+ init_result(pargs);
+ cache_poller_locked(pargs);
+ gpr_thd_id t_id;
+ gpr_thd_options opt = gpr_thd_options_default();
+ gpr_ref(&g_cvfds.pollcount);
+ gpr_thd_options_set_detached(&opt);
+ GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
+ return pargs;
+}
+
+static void cache_delete_locked(poll_args *args) {
+ if (!args->prev) {
+ uint32_t key = gpr_murmur_hash3(
+ args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
+ key = key % poll_cache.size;
+ GPR_ASSERT(poll_cache.active_pollers[key] == args);
+ poll_cache.active_pollers[key] = args->next;
+ } else {
+ args->prev->next = args->next;
}
- gpr_mu_lock(&g_cvfds.mu);
- if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) {
- // Signal main thread that the poll completed
- gpr_atm_no_barrier_store(&pargs->status, COMPLETED);
- gpr_cv_signal(pargs->cv);
+
+ if (args->next) {
+ args->next->prev = args->prev;
}
- decref_poll_args(pargs);
- g_cvfds.pollcount--;
- if (g_cvfds.shutdown && g_cvfds.pollcount == 0) {
- gpr_cv_signal(&g_cvfds.shutdown_complete);
+
+ poll_cache.count--;
+ if (poll_cache.free_pollers) {
+ poll_cache.free_pollers->prev = args;
+ }
+ args->prev = NULL;
+ args->next = poll_cache.free_pollers;
+ gpr_free(args->fds);
+ poll_cache.free_pollers = args;
+}
+
+static void cache_poller_locked(poll_args *args) {
+ if (poll_cache.count + 1 > poll_cache.size / 2) {
+ poll_args **old_active_pollers = poll_cache.active_pollers;
+ poll_cache.size = poll_cache.size * 2;
+ poll_cache.count = 0;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+ for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
+ poll_args *curr = old_active_pollers[i];
+ poll_args *next = NULL;
+ while (curr) {
+ next = curr->next;
+ cache_insert_locked(curr);
+ curr = next;
+ }
+ }
+ gpr_free(old_active_pollers);
+ }
+
+ cache_insert_locked(args);
+}
+
+static void cache_destroy_locked(poll_args *args) {
+ if (args->next) {
+ args->next->prev = args->prev;
+ }
+
+ if (args->prev) {
+ args->prev->next = args->next;
+ } else {
+ poll_cache.free_pollers = args->next;
+ }
+
+ gpr_free(args);
+}
+
+static void decref_poll_result(poll_result *res) {
+ if (gpr_unref(&res->refcount)) {
+ GPR_ASSERT(!res->watchers);
+ gpr_free(res->fds);
+ gpr_free(res);
+ }
+}
+
+void remove_cvn(cv_node **head, cv_node *target) {
+ if (target->next) {
+ target->next->prev = target->prev;
+ }
+
+ if (target->prev) {
+ target->prev->next = target->next;
+ } else {
+ *head = target->next;
+ }
+}
+
+gpr_timespec thread_grace;
+
+// Poll in a background thread
+static void run_poll(void *args) {
+ poll_args *pargs = (poll_args *)args;
+ while (1) {
+ poll_result *result = pargs->result;
+ int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
+ gpr_mu_lock(&g_cvfds.mu);
+ if (retval != 0) {
+ result->completed = 1;
+ result->retval = retval;
+ result->err = errno;
+ cv_node *watcher = result->watchers;
+ while (watcher) {
+ gpr_cv_signal(watcher->cv);
+ watcher = watcher->next;
+ }
+ }
+ if (result->watchcount == 0 || result->completed) {
+ cache_delete_locked(pargs);
+ decref_poll_result(result);
+ // Leave this polling thread alive for a grace period to do another poll()
+ // op
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ deadline = gpr_time_add(deadline, thread_grace);
+ pargs->trigger_set = 0;
+ gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
+ if (!pargs->trigger_set) {
+ cache_destroy_locked(pargs);
+ break;
+ }
+ }
+ gpr_mu_unlock(&g_cvfds.mu);
+ }
+
+ // We still have the lock here
+ if (gpr_unref(&g_cvfds.pollcount)) {
+ gpr_cv_signal(&g_cvfds.shutdown_cv);
}
gpr_mu_unlock(&g_cvfds.mu);
}
@@ -1329,24 +1516,29 @@ static void run_poll(void *arg) {
static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- gpr_cv *pollcv;
- cv_node *cvn, *prev;
+ cv_node *pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
- gpr_thd_id t_id;
- gpr_thd_options opt;
- poll_args *pargs = NULL;
+ poll_result *result = NULL;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = gpr_malloc(sizeof(gpr_cv));
- gpr_cv_init(pollcv);
+ pollcv = gpr_malloc(sizeof(cv_node));
+ pollcv->next = NULL;
+ gpr_cv pollcv_cv;
+ gpr_cv_init(&pollcv_cv);
+ pollcv->cv = &pollcv_cv;
+ cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node));
+
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
idx = FD_TO_IDX(fds[i].fd);
- cvn = gpr_malloc(sizeof(cv_node));
- cvn->cv = pollcv;
- cvn->next = g_cvfds.cvfds[idx].cvs;
- g_cvfds.cvfds[idx].cvs = cvn;
+ fd_cvs[i].cv = &pollcv_cv;
+ fd_cvs[i].prev = NULL;
+ fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
+ if (g_cvfds.cvfds[idx].cvs) {
+ g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
+ }
+ g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
// Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
skip_poll = 1;
@@ -1356,81 +1548,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
}
}
+ gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
+ if (timeout < 0) {
+ deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ deadline =
+ gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
+ }
+
res = 0;
if (!skip_poll && nsockfds > 0) {
- pargs = gpr_malloc(sizeof(struct poll_args));
- // Both the main thread and calling thread get a reference
- gpr_ref_init(&pargs->refcount, 2);
- pargs->cv = pollcv;
- pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
- pargs->nfds = nsockfds;
- pargs->timeout = timeout;
- pargs->retval = 0;
- pargs->err = 0;
- gpr_atm_no_barrier_store(&pargs->status, INPROGRESS);
+ struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds);
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd >= 0) {
- pargs->fds[idx].fd = fds[i].fd;
- pargs->fds[idx].events = fds[i].events;
- pargs->fds[idx].revents = 0;
+ pollfds[idx].fd = fds[i].fd;
+ pollfds[idx].events = fds[i].events;
+ pollfds[idx].revents = 0;
idx++;
}
}
- g_cvfds.pollcount++;
- opt = gpr_thd_options_default();
- gpr_thd_options_set_detached(&opt);
- GPR_ASSERT(gpr_thd_new(&t_id, &run_poll, pargs, &opt));
- // We want the poll() thread to trigger the deadline, so wait forever here
- gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
- if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- res = pargs->retval;
- errno = pargs->err;
- } else {
- errno = 0;
- gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
+ poll_args *pargs = get_poller_locked(pollfds, nsockfds);
+ result = pargs->result;
+ pollcv->next = result->watchers;
+ pollcv->prev = NULL;
+ if (result->watchers) {
+ result->watchers->prev = pollcv;
}
+ result->watchers = pollcv;
+ result->watchcount++;
+ gpr_ref(&result->refcount);
+
+ pargs->trigger_set = 1;
+ gpr_cv_signal(&pargs->trigger);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
+ res = result->retval;
+ errno = result->err;
+ result->watchcount--;
+ remove_cvn(&result->watchers, pollcv);
} else if (!skip_poll) {
- gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
- deadline =
- gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
- gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
+ gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
}
idx = 0;
for (i = 0; i < nfds; i++) {
if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
- cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs;
- prev = NULL;
- while (cvn->cv != pollcv) {
- prev = cvn;
- cvn = cvn->next;
- GPR_ASSERT(cvn);
- }
- if (!prev) {
- g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next;
- } else {
- prev->next = cvn->next;
- }
- gpr_free(cvn);
-
+ remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (!skip_poll && fds[i].fd >= 0 &&
- gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
- fds[i].revents = pargs->fds[idx].revents;
+ } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
+ fds[i].revents = result->fds[idx].revents;
idx++;
}
}
- if (pargs) {
- decref_poll_args(pargs);
- } else {
- gpr_cv_destroy(pollcv);
- gpr_free(pollcv);
+ gpr_free(fd_cvs);
+ gpr_free(pollcv);
+ if (result) {
+ decref_poll_result(result);
}
+
gpr_mu_unlock(&g_cvfds.mu);
return res;
@@ -1439,12 +1618,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
static void global_cv_fd_table_init() {
gpr_mu_init(&g_cvfds.mu);
gpr_mu_lock(&g_cvfds.mu);
- gpr_cv_init(&g_cvfds.shutdown_complete);
- g_cvfds.shutdown = 0;
- g_cvfds.pollcount = 0;
+ gpr_cv_init(&g_cvfds.shutdown_cv);
+ gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = NULL;
+ thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = NULL;
@@ -1454,23 +1633,35 @@ static void global_cv_fd_table_init() {
// Override the poll function with one that supports cvfds
g_cvfds.poll = grpc_poll_function;
grpc_poll_function = &cvfd_poll;
+
+ // Initialize the cache
+ poll_cache.size = 32;
+ poll_cache.count = 0;
+ poll_cache.free_pollers = NULL;
+ poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32);
+ for (unsigned int i = 0; i < poll_cache.size; i++) {
+ poll_cache.active_pollers[i] = NULL;
+ }
+
gpr_mu_unlock(&g_cvfds.mu);
}
static void global_cv_fd_table_shutdown() {
gpr_mu_lock(&g_cvfds.mu);
- g_cvfds.shutdown = 1;
// Attempt to wait for all abandoned poll() threads to terminate
// Not doing so will result in reported memory leaks
- if (g_cvfds.pollcount > 0) {
- int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu,
+ if (!gpr_unref(&g_cvfds.pollcount)) {
+ int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_seconds(3, GPR_TIMESPAN)));
GPR_ASSERT(res == 0);
}
- gpr_cv_destroy(&g_cvfds.shutdown_complete);
+ gpr_cv_destroy(&g_cvfds.shutdown_cv);
grpc_poll_function = g_cvfds.poll;
gpr_free(g_cvfds.cvfds);
+
+ gpr_free(poll_cache.active_pollers);
+
gpr_mu_unlock(&g_cvfds.mu);
gpr_mu_destroy(&g_cvfds.mu);
}
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index cff77e641c..91f8cd5482 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -170,8 +170,9 @@ int grpc_fd_wrapped_fd(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason) {
- g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
+ int *release_fd, bool already_closed, const char *reason) {
+ g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, already_closed,
+ reason);
}
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 0de7333843..1108e46ef8 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -37,7 +37,7 @@ typedef struct grpc_event_engine_vtable {
grpc_fd *(*fd_create)(int fd, const char *name);
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
@@ -104,7 +104,7 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int *release_fd, const char *reason);
+ int *release_fd, bool already_closed, const char *reason);
/* Has grpc_fd_shutdown been called on an fd? */
bool grpc_fd_is_shutdown(grpc_fd *fd);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 833170ceed..41c69add17 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -51,33 +51,6 @@ bool grpc_exec_ctx_has_work(grpc_exec_ctx *exec_ctx) {
!grpc_closure_list_empty(exec_ctx->closure_list);
}
-bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
- bool did_something = 0;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
- for (;;) {
- if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
- grpc_closure *c = exec_ctx->closure_list.head;
- exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
- while (c != NULL) {
- grpc_closure *next = c->next_data.next;
- grpc_error *error = c->error_data.error;
- did_something = true;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- c = next;
- }
- } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
- break;
- }
- }
- GPR_ASSERT(exec_ctx->active_combiner == NULL);
- GPR_TIMER_END("grpc_exec_ctx_flush", 0);
- return did_something;
-}
-
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
exec_ctx->flags |= GRPC_EXEC_CTX_FLAG_IS_FINISHED;
grpc_exec_ctx_flush(exec_ctx);
@@ -103,6 +76,29 @@ static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
GRPC_ERROR_UNREF(error);
}
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
+ bool did_something = 0;
+ GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
+ for (;;) {
+ if (!grpc_closure_list_empty(exec_ctx->closure_list)) {
+ grpc_closure *c = exec_ctx->closure_list.head;
+ exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ did_something = true;
+ exec_ctx_run(exec_ctx, c, error);
+ c = next;
+ }
+ } else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
+ break;
+ }
+ }
+ GPR_ASSERT(exec_ctx->active_combiner == NULL);
+ GPR_TIMER_END("grpc_exec_ctx_flush", 0);
+ return did_something;
+}
+
static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
diff --git a/src/core/lib/iomgr/gethostname.h b/src/core/lib/iomgr/gethostname.h
new file mode 100644
index 0000000000..9c6b9d8d42
--- /dev/null
+++ b/src/core/lib/iomgr/gethostname.h
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H
+#define GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H
+
+// Returns the hostname of the local machine.
+// Caller takes ownership of result.
+char *grpc_gethostname();
+
+#endif /* GRPC_CORE_LIB_IOMGR_GETHOSTNAME_H */
diff --git a/src/core/lib/iomgr/gethostname_fallback.c b/src/core/lib/iomgr/gethostname_fallback.c
new file mode 100644
index 0000000000..6229461568
--- /dev/null
+++ b/src/core/lib/iomgr/gethostname_fallback.c
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_GETHOSTNAME_FALLBACK
+
+#include <stddef.h>
+
+char *grpc_gethostname() { return NULL; }
+
+#endif // GRPC_GETHOSTNAME_FALLBACK
diff --git a/src/core/lib/iomgr/gethostname_host_name_max.c b/src/core/lib/iomgr/gethostname_host_name_max.c
new file mode 100644
index 0000000000..4d0511412e
--- /dev/null
+++ b/src/core/lib/iomgr/gethostname_host_name_max.c
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_HOST_NAME_MAX
+
+#include <limits.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+
+char *grpc_gethostname() {
+ char *hostname = (char *)gpr_malloc(HOST_NAME_MAX);
+ if (gethostname(hostname, HOST_NAME_MAX) != 0) {
+ gpr_free(hostname);
+ return NULL;
+ }
+ return hostname;
+}
+
+#endif // GRPC_POSIX_HOST_NAME_MAX
diff --git a/src/core/lib/iomgr/gethostname_sysconf.c b/src/core/lib/iomgr/gethostname_sysconf.c
new file mode 100644
index 0000000000..51bac5d69d
--- /dev/null
+++ b/src/core/lib/iomgr/gethostname_sysconf.c
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SYSCONF
+
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+
+char *grpc_gethostname() {
+ size_t host_name_max = (size_t)sysconf(_SC_HOST_NAME_MAX);
+ char *hostname = (char *)gpr_malloc(host_name_max);
+ if (gethostname(hostname, host_name_max) != 0) {
+ gpr_free(hostname);
+ return NULL;
+ }
+ return hostname;
+}
+
+#endif // GRPC_POSIX_SYSCONF
diff --git a/src/core/lib/iomgr/nameser.h b/src/core/lib/iomgr/nameser.h
new file mode 100644
index 0000000000..daed6de518
--- /dev/null
+++ b/src/core/lib/iomgr/nameser.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_NAMESER_H
+#define GRPC_CORE_LIB_IOMGR_NAMESER_H
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_HAVE_ARPA_NAMESER
+
+#include <arpa/nameser.h>
+
+#else /* GRPC_HAVE_ARPA_NAMESER */
+
+typedef enum __ns_class {
+ ns_c_invalid = 0, /* Cookie. */
+ ns_c_in = 1, /* Internet. */
+ ns_c_2 = 2, /* unallocated/unsupported. */
+ ns_c_chaos = 3, /* MIT Chaos-net. */
+ ns_c_hs = 4, /* MIT Hesiod. */
+ /* Query class values which do not appear in resource records */
+ ns_c_none = 254, /* for prereq. sections in update requests */
+ ns_c_any = 255, /* Wildcard match. */
+ ns_c_max = 65536
+} ns_class;
+
+typedef enum __ns_type {
+ ns_t_invalid = 0, /* Cookie. */
+ ns_t_a = 1, /* Host address. */
+ ns_t_ns = 2, /* Authoritative server. */
+ ns_t_md = 3, /* Mail destination. */
+ ns_t_mf = 4, /* Mail forwarder. */
+ ns_t_cname = 5, /* Canonical name. */
+ ns_t_soa = 6, /* Start of authority zone. */
+ ns_t_mb = 7, /* Mailbox domain name. */
+ ns_t_mg = 8, /* Mail group member. */
+ ns_t_mr = 9, /* Mail rename name. */
+ ns_t_null = 10, /* Null resource record. */
+ ns_t_wks = 11, /* Well known service. */
+ ns_t_ptr = 12, /* Domain name pointer. */
+ ns_t_hinfo = 13, /* Host information. */
+ ns_t_minfo = 14, /* Mailbox information. */
+ ns_t_mx = 15, /* Mail routing information. */
+ ns_t_txt = 16, /* Text strings. */
+ ns_t_rp = 17, /* Responsible person. */
+ ns_t_afsdb = 18, /* AFS cell database. */
+ ns_t_x25 = 19, /* X_25 calling address. */
+ ns_t_isdn = 20, /* ISDN calling address. */
+ ns_t_rt = 21, /* Router. */
+ ns_t_nsap = 22, /* NSAP address. */
+ ns_t_nsap_ptr = 23, /* Reverse NSAP lookup (deprecated). */
+ ns_t_sig = 24, /* Security signature. */
+ ns_t_key = 25, /* Security key. */
+ ns_t_px = 26, /* X.400 mail mapping. */
+ ns_t_gpos = 27, /* Geographical position (withdrawn). */
+ ns_t_aaaa = 28, /* Ip6 Address. */
+ ns_t_loc = 29, /* Location Information. */
+ ns_t_nxt = 30, /* Next domain (security). */
+ ns_t_eid = 31, /* Endpoint identifier. */
+ ns_t_nimloc = 32, /* Nimrod Locator. */
+ ns_t_srv = 33, /* Server Selection. */
+ ns_t_atma = 34, /* ATM Address */
+ ns_t_naptr = 35, /* Naming Authority PoinTeR */
+ ns_t_kx = 36, /* Key Exchange */
+ ns_t_cert = 37, /* Certification record */
+ ns_t_a6 = 38, /* IPv6 address (deprecates AAAA) */
+ ns_t_dname = 39, /* Non-terminal DNAME (for IPv6) */
+ ns_t_sink = 40, /* Kitchen sink (experimentatl) */
+ ns_t_opt = 41, /* EDNS0 option (meta-RR) */
+ ns_t_apl = 42, /* Address prefix list (RFC3123) */
+ ns_t_ds = 43, /* Delegation Signer (RFC4034) */
+ ns_t_sshfp = 44, /* SSH Key Fingerprint (RFC4255) */
+ ns_t_rrsig = 46, /* Resource Record Signature (RFC4034) */
+ ns_t_nsec = 47, /* Next Secure (RFC4034) */
+ ns_t_dnskey = 48, /* DNS Public Key (RFC4034) */
+ ns_t_tkey = 249, /* Transaction key */
+ ns_t_tsig = 250, /* Transaction signature. */
+ ns_t_ixfr = 251, /* Incremental zone transfer. */
+ ns_t_axfr = 252, /* Transfer zone of authority. */
+ ns_t_mailb = 253, /* Transfer mailbox records. */
+ ns_t_maila = 254, /* Transfer mail agent records. */
+ ns_t_any = 255, /* Wildcard match. */
+ ns_t_zxfr = 256, /* BIND-specific, nonstandard. */
+ ns_t_max = 65536
+} ns_type;
+
+#endif /* GRPC_HAVE_ARPA_NAMESER */
+
+#endif /* GRPC_CORE_LIB_IOMGR_NAMESER_H */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index f5d15b4850..42033d0ba4 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -24,6 +24,7 @@
#if defined(GRPC_UV)
// Do nothing
#elif defined(GPR_MANYLINUX1)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
@@ -51,12 +52,14 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1
+#define GRPC_POSIX_HOST_NAME_MAX 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_WAKEUP_FD 1
@@ -82,6 +85,7 @@
#define GRPC_POSIX_SOCKETUTILS
#endif
#elif defined(GPR_APPLE)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
#define GRPC_HAVE_UNIX_SOCKET 1
@@ -90,9 +94,11 @@
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
+#define GRPC_POSIX_SYSCONF 1
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_SO_NOSIGPIPE 1
@@ -104,6 +110,7 @@
#define GRPC_POSIX_WAKEUP_FD 1
#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
+#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
#define GRPC_POSIX_SOCKETADDR 1
@@ -120,4 +127,11 @@
#error Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
#endif
+#if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF)
+#error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF"
+#endif
+#if !defined(GRPC_POSIX_HOST_NAME_MAX) && !defined(GRPC_POSIX_SYSCONF)
+#define GRPC_GETHOSTNAME_FALLBACK 1
+#endif
+
#endif /* GRPC_CORE_LIB_IOMGR_PORT_H */
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 21e320a6e7..a25fba4527 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -209,7 +209,8 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan");
+ grpc_fd_orphan(exec_ctx, fd, NULL, NULL, false /* already_closed */,
+ "tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@@ -295,7 +296,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
- grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
+ grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, false /* already_closed */,
+ "tcp_client_connect_error");
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 9d27de0b1c..1bace788c5 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -296,7 +296,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
- "tcp_unref_orphan");
+ false /* already_closed */, "tcp_unref_orphan");
grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index f304642951..0fc5c0fd86 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -166,7 +166,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "tcp_listener_shutdown");
+ false /* already_closed */, "tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
index dbb43186bd..ad535bc43e 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.c
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c
@@ -39,7 +39,7 @@
#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
-static gpr_once s_init_max_accept_queue_size;
+static gpr_once s_init_max_accept_queue_size = GPR_ONCE_INIT;
static int s_max_accept_queue_size;
/* get max listen queue size on linux */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 54e7f417a7..88fa34cb7a 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -214,7 +214,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
sp->server->user_data);
}
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
- "udp_listener_shutdown");
+ false /* already_closed */, "udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index c5dcdc9746..46e84f5843 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -43,6 +43,7 @@
typedef struct cv_node {
gpr_cv* cv;
struct cv_node* next;
+ struct cv_node* prev;
} cv_node;
typedef struct fd_node {
@@ -53,9 +54,8 @@ typedef struct fd_node {
typedef struct cv_fd_table {
gpr_mu mu;
- int pollcount;
- int shutdown;
- gpr_cv shutdown_complete;
+ gpr_refcount pollcount;
+ gpr_cv shutdown_cv;
fd_node* cvfds;
fd_node* free_fds;
unsigned int size;