aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epollex_linux.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-13 15:37:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-04-13 15:37:14 -0700
commit9f012514738c560df9ccd72971390a5b097256c0 (patch)
tree9c316e93888caba5164dbd16bdf2f1597c510eb9 /src/core/lib/iomgr/ev_epollex_linux.c
parent7a59b56e53020e655f83d65c165dfb8c228ddf4d (diff)
Working towards a single-fd optimized data structure
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.c')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c161
1 files changed, 109 insertions, 52 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 0b3e2b13fc..6e6e4460c0 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -170,24 +170,27 @@ static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
struct grpc_pollset_worker {
bool kicked;
bool initialized_cv;
+ bool inserted;
gpr_cv cv;
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
+ grpc_pollset *pollset;
};
-struct grpc_pollset {
+struct pollable {
polling_obj po;
- /* Pollable set - possible values:
- 0 - nothing is pollable
- pointer | 1 - a single pollable file descriptor
- (fd << 1) | 0 - an epoll fd */
- gpr_atm pollable_set_atm;
+ int epfd;
+ grpc_wakeup_fd wakeup;
+ grpc_pollset_worker *root_worker;
int num_pollers;
- bool kicked_without_poller;
gpr_atm shutdown_atm;
+};
+
+struct grpc_pollset {
+ pollable pollable;
+ pollable *current_pollable;
+ bool kicked_without_poller;
grpc_closure *shutdown_closure;
- grpc_wakeup_fd pollset_wakeup;
- grpc_pollset_worker *root_worker;
};
/*******************************************************************************
@@ -573,32 +576,53 @@ static grpc_error *kick_poller(void) {
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
po_init(&pollset->po, PO_POLLSET);
pollset->kicked_without_poller = false;
- pollset->epfd = epoll_create1(EPOLL_CLOEXEC);
- if (pollset->epfd < 0) {
- GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_create1"));
+ gpr_atm_no_barrier_store(&pollset->pollable_set_atm, 0);
+ pollset->num_pollers = 0;
+ gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0);
+ pollset->shutdown_closure = NULL;
+ pollset->root_worker = NULL;
+ *mu = &pollset->po.mu;
+}
+
+static grpc_error *multipoller_create(multipoller **out) {
+ multipoller *p = gpr_malloc(sizeof(*p));
+ p->epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (p->epfd < 0) {
+ grpc_error *err = GRPC_OS_ERROR(errno, "epoll_create1");
+ gpr_free(p);
+ return err;
} else {
struct epoll_event ev = {.events = EPOLLIN | EPOLLET | EPOLLEXCLUSIVE,
.data.ptr = &global_wakeup_fd};
- if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
- &ev) != 0) {
- GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_ctl"));
+ if (epoll_ctl(p->epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+ grpc_error *err = GRPC_OS_ERROR(errno, "epoll_ctl");
+ close(p->epfd);
+ gpr_free(p);
+ return err;
}
}
- pollset->num_pollers = 0;
- gpr_atm_no_barrier_store(&pollset->shutdown_atm, 0);
- pollset->shutdown_closure = NULL;
- if (GRPC_LOG_IF_ERROR("pollset_init",
- grpc_wakeup_fd_init(&pollset->pollset_wakeup)) &&
- pollset->epfd >= 0) {
- struct epoll_event ev = {.events = EPOLLIN | EPOLLET,
- .data.ptr = &pollset->pollset_wakeup};
- if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, pollset->pollset_wakeup.read_fd,
- &ev) != 0) {
- GRPC_LOG_IF_ERROR("pollset_init", GRPC_OS_ERROR(errno, "epoll_ctl"));
- }
+ grpc_error *err = grpc_wakeup_fd_init(&p->wakeup);
+ if (err != GRPC_ERROR_NONE) {
+ close(p->epfd);
+ gpr_free(p);
+ return err;
}
- pollset->root_worker = NULL;
- *mu = &pollset->po.mu;
+ struct epoll_event ev = {.events = EPOLLIN | EPOLLET, .data.ptr = &p->wakeup};
+ if (epoll_ctl(pollset->epfd, EPOLL_CTL_ADD, p->wakeup.read_fd, &ev) != 0) {
+ err = GRPC_OS_ERROR(errno, "epoll_ctl");
+ close(p->epfd);
+ grpc_wakeup_fd_destroy(&p->wakeup);
+ gpr_free(p);
+ return err;
+ }
+ *out = p;
+ return GRPC_ERROR_NONE;
+}
+
+static void multipoller_destroy(multipoller *p) {
+ close(p->epfd);
+ grpc_wakeup_fd_destroy(&p->wakeup);
+ gpr_free(p);
}
/* Convert a timespec to milliseconds:
@@ -678,31 +702,40 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* pollset_shutdown is guaranteed to be called before pollset_destroy. */
static void pollset_destroy(grpc_pollset *pollset) {
po_destroy(&pollset->po);
- if (pollset->epfd >= 0) close(pollset->epfd);
- grpc_wakeup_fd_destroy(&pollset->pollset_wakeup);
+ switch (pollset->occupancy) {
+ case POLLSET_EMPTY:
+ break;
+ case POLLSET_UNARY_FD:
+ UNREF_BY(pollset->pollable.unary_fd, 2);
+ break;
+ case POLLSET_MULTIPOLLER:
+ multipoller_destroy(pollset->pollable.multipoller);
+ break;
+ }
}
#define MAX_EPOLL_EVENTS 100
-static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- gpr_timespec now, gpr_timespec deadline) {
+static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ gpr_timespec now, gpr_timespec deadline) {
struct epoll_event events[MAX_EPOLL_EVENTS];
static const char *err_desc = "pollset_poll";
- if (pollset->epfd < 0) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "epoll fd failed to initialize");
- }
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
int timeout = poll_deadline_to_millis_timeout(deadline, now);
if (grpc_polling_trace) {
gpr_log(GPR_DEBUG, "PS:%p poll for %dms", pollset, timeout);
}
- int r = epoll_wait(pollset->epfd, events, MAX_EPOLL_EVENTS, timeout);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
+ if (timeout != 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ }
+ int r = epoll_wait(pollset->pollable.multipoller->epfd, events,
+ MAX_EPOLL_EVENTS, timeout);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ }
+
if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
if (grpc_polling_trace) {
@@ -720,7 +753,7 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_timer_consume_kick();
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
- } else if (data_ptr == &pollset->pollset_wakeup) {
+ } else if (data_ptr == &pollset->pollable.multipoller->pollset_wakeup) {
if (grpc_polling_trace) {
gpr_log(GPR_DEBUG, "PS:%p poll got pollset_wakeup", pollset);
}
@@ -728,7 +761,9 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
the fd is level triggered and non-exclusive, which should result in all
pollers waking */
if (gpr_atm_no_barrier_load(&pollset->shutdown_atm) == 0) {
- append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
+ append_error(&error,
+ grpc_wakeup_fd_consume_wakeup(
+ &pollset->pollable.multipoller->pollset_wakeup),
err_desc);
}
} else {
@@ -767,20 +802,29 @@ static grpc_error *pollset_poll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl,
gpr_timespec deadline) {
- if (worker_hdl != NULL) {
- *worker_hdl = worker;
+ worker->initialized_cv = false;
+ worker->inserted = false;
+ if (worker_hdl != NULL || pollset->occupancy != POLLSET_MULTIPOLLER) {
+ if (worker_hdl != NULL) *worker_hdl = worker;
worker->kicked = false;
+ worker->inserted = true;
if (pollset->root_worker == NULL) {
pollset->root_worker = worker;
worker->next = worker->prev = worker;
- worker->initialized_cv = false;
+ if (pollset->occupancy == POLLSET_EMPTY) {
+ worker->initialized_cv = true;
+ }
} else {
worker->next = pollset->root_worker;
worker->prev = worker->next->prev;
worker->next->prev = worker->prev->next = worker;
worker->initialized_cv = true;
+ }
+ if (worker->initialized_cv) {
+ GPR_ASSERT(worker->inserted);
gpr_cv_init(&worker->cv);
- while (pollset->root_worker != worker) {
+ while (pollset->root_worker != worker ||
+ pollset->occupancy == POLLSET_EMPTY) {
if (gpr_cv_wait(&worker->cv, &pollset->po.mu, deadline)) return false;
if (worker->kicked) return false;
}
@@ -791,7 +835,7 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
static void end_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
- if (worker_hdl != NULL) {
+ if (worker->inserted) {
if (worker == pollset->root_worker) {
if (worker == worker->next) {
pollset->root_worker = NULL;
@@ -819,6 +863,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
+ grpc_pollset_worker *fake_worker_hdl;
if (grpc_polling_trace) {
gpr_log(GPR_DEBUG, "PS:%p work hdl=%p worker=%p now=%" PRId64
".%09d deadline=%" PRId64 ".%09d kwp=%d root_worker=%p",
@@ -836,10 +881,22 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
GPR_ASSERT(!pollset->shutdown_closure);
pollset->num_pollers++;
- gpr_mu_unlock(&pollset->po.mu);
- error = pollset_poll(exec_ctx, pollset, now, deadline);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->po.mu);
+ switch (pollset->occupancy) {
+ case POLLSET_EMPTY:
+ GPR_UNREACHABLE_CODE(break);
+ case POLLSET_UNARY_FD:
+ gpr_mu_unlock(&pollset->po.mu);
+ error = pollset_poll(exec_ctx, pollset, now, deadline);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->po.mu);
+ break;
+ case POLLSET_MULTIPOLLER:
+ gpr_mu_unlock(&pollset->po.mu);
+ error = pollset_epoll(exec_ctx, pollset, now, deadline);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->po.mu);
+ break;
+ }
gpr_tls_set(&g_current_thread_pollset, 0);
gpr_tls_set(&g_current_thread_worker, 0);
pollset->num_pollers--;